Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ services:
- neo4j
environment:
TEST_NEO4J_HOST: neo4j
TEST_NEO4J_PORT: 7687 # Add this if your testkit uses it
TEST_NEO4J_USER: neo4j
TEST_NEO4J_PASS: testtest
TEST_DRIVER_NAME: php
Expand Down
96 changes: 80 additions & 16 deletions src/Bolt/BoltConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class BoltConnection implements ConnectionInterface
* @var list<WeakReference<CypherList>>
*/
private array $subscribedResults = [];
private bool $inTransaction = false;

/**
* @return array{0: V4_4|V5|V5_1|V5_2|V5_3|V5_4|null, 1: Connection}
Expand Down Expand Up @@ -206,21 +207,29 @@ public function reset(): void
$this->subscribedResults = [];
}

private function prepareForBegin(): void
{
if (in_array($this->getServerState(), ['STREAMING', 'TX_STREAMING'], true)) {
$this->discardUnconsumedResults();
}
}

/**
* Begins a transaction.
*
* Any of the preconditioned states are: 'READY', 'INTERRUPTED'.
*
* @param iterable<string, scalar|array|null>|null $txMetaData
* @param array<string, scalar|array|null>|null $txMetaData
*/
public function begin(?string $database, ?float $timeout, BookmarkHolder $holder, ?iterable $txMetaData): void
public function begin(?string $database, ?float $timeout, BookmarkHolder $holder, ?array $txMetaData): void
{
$this->consumeResults();

$extra = $this->buildRunExtra($database, $timeout, $holder, AccessMode::WRITE(), $txMetaData);
$extra = $this->buildRunExtra($database, $timeout, $holder, $this->getAccessMode(), $txMetaData);
$message = $this->messageFactory->createBeginMessage($extra);
$response = $message->send()->getResponse();
$this->assertNoFailure($response);
$this->inTransaction = true;
}

/**
Expand Down Expand Up @@ -253,7 +262,11 @@ public function run(
?AccessMode $mode,
?iterable $tsxMetadata,
): array {
$extra = $this->buildRunExtra($database, $timeout, $holder, $mode, $tsxMetadata);
if ($this->isInTransaction()) {
$extra = [];
} else {
$extra = $this->buildRunExtra($database, $timeout, $holder, $mode, $tsxMetadata, false);
}
$message = $this->messageFactory->createRunMessage($text, $parameters, $extra);
$response = $message->send()->getResponse();
$this->assertNoFailure($response);
Expand Down Expand Up @@ -319,9 +332,8 @@ public function close(): void
try {
if ($this->isOpen()) {
if ($this->isStreaming()) {
$this->consumeResults();
$this->discardUnconsumedResults();
}

$message = $this->messageFactory->createGoodbyeMessage();
$message->send();

Expand All @@ -331,7 +343,7 @@ public function close(): void
}
}

private function buildRunExtra(?string $database, ?float $timeout, BookmarkHolder $holder, ?AccessMode $mode, ?iterable $metadata): array
private function buildRunExtra(?string $database, ?float $timeout, BookmarkHolder $holder, ?AccessMode $mode, ?iterable $metadata, bool $forBegin = false): array
{
$extra = [];
if ($database !== null) {
Expand All @@ -341,19 +353,28 @@ private function buildRunExtra(?string $database, ?float $timeout, BookmarkHolde
$extra['tx_timeout'] = (int) ($timeout * 1000);
}

if (!$holder->getBookmark()->isEmpty()) {
$bookmarks = $holder->getBookmark()->values();
if (!empty($bookmarks)) {
$extra['bookmarks'] = $holder->getBookmark()->values();
}

if ($mode) {
$extra['mode'] = AccessMode::WRITE() === $mode ? 'w' : 'r';
}
if ($forBegin) {
$bookmarks = $holder->getBookmark()->values();
if (!empty($bookmarks)) {
$extra['bookmarks'] = $bookmarks;
}

if ($metadata !== null) {
$metadataArray = $metadata instanceof Traversable ? iterator_to_array($metadata) : $metadata;
if (count($metadataArray) > 0) {
$extra['tx_metadata'] = $metadataArray;
if ($mode !== null) {
$extra['mode'] = $mode === AccessMode::WRITE() ? 'w' : 'r';
}

if ($metadata !== null) {
$metadataArray = $metadata instanceof Traversable ? iterator_to_array($metadata) : $metadata;
if (!empty($metadataArray)) {
$extra['tx_metadata'] = $metadataArray;
}
}

}

return $extra;
Expand All @@ -362,11 +383,13 @@ private function buildRunExtra(?string $database, ?float $timeout, BookmarkHolde
private function buildResultExtra(?int $fetchSize, ?int $qid): array
{
$extra = [];
$fetchSize = 1000;
/** @psalm-suppress RedundantCondition */
if ($fetchSize !== null) {
$extra['n'] = $fetchSize;
}

if ($qid !== null) {
if ($qid !== null && $qid >= 0) {
$extra['qid'] = $qid;
}

Expand Down Expand Up @@ -405,4 +428,45 @@ private function assertNoFailure(Response $response): void
throw Neo4jException::fromBoltResponse($response);
}
}

/**
* Discard unconsumed results - sends DISCARD to server for each subscribed result.
*/
public function discardUnconsumedResults(): void
{
$this->logger?->log(LogLevel::DEBUG, 'Discarding unconsumed results');
$this->subscribedResults = array_values(array_filter(
$this->subscribedResults,
static fn (WeakReference $ref): bool => $ref->get() !== null
));

if (empty($this->subscribedResults)) {
$this->logger?->log(LogLevel::DEBUG, 'No unconsumed results to discard');

return;
}

$state = $this->getServerState();
$this->logger?->log(LogLevel::DEBUG, "Server state before discard: {$state}");

try {
if (in_array($state, ['STREAMING', 'TX_STREAMING'], true)) {
$this->discard(null);
$this->logger?->log(LogLevel::DEBUG, 'Sent DISCARD ALL for unconsumed results');
} else {
$this->logger?->log(LogLevel::DEBUG, 'Skipping discard - server not in streaming state');
}
} catch (Throwable $e) {
$this->logger?->log(LogLevel::ERROR, 'Failed to discard results', [
'exception' => $e->getMessage(),
]);
}

$this->subscribedResults = [];
}

private function isInTransaction(): bool
{
return $this->inTransaction;
}
}
13 changes: 12 additions & 1 deletion src/Bolt/Session.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
*/
final class Session implements SessionInterface
{
/** @var list<BoltConnection> */
private array $usedConnections = [];
/** @psalm-readonly */
private readonly BookmarkHolder $bookmarkHolder;

Expand Down Expand Up @@ -178,6 +180,7 @@ private function acquireConnection(TransactionConfiguration $config, SessionConf
$timeout = ($timeout < 30) ? 30 : $timeout;
$connection->setTimeout($timeout + 2);
}
$this->usedConnections[] = $connection;

return $connection;
}
Expand All @@ -187,14 +190,14 @@ private function startTransaction(TransactionConfiguration $config, SessionConfi
$this->getLogger()?->log(LogLevel::INFO, 'Starting transaction', ['config' => $config, 'sessionConfig' => $sessionConfig]);
try {
$connection = $this->acquireConnection($config, $sessionConfig);

$connection->begin($this->config->getDatabase(), $config->getTimeout(), $this->bookmarkHolder, $config->getMetaData());
} catch (Neo4jException $e) {
if (isset($connection) && $connection->getServerState() === 'FAILED') {
$connection->reset();
}
throw $e;
}
error_log('>>> EXIT startTransaction()');

return new BoltUnmanagedTransaction(
$this->config->getDatabase(),
Expand All @@ -217,6 +220,14 @@ public function getLastBookmark(): Bookmark
return $this->bookmarkHolder->getBookmark();
}

public function close(): void
{
foreach ($this->usedConnections as $connection) {
$connection->discardUnconsumedResults();
}
$this->usedConnections = [];
}

private function getLogger(): ?Neo4jLogger
{
return $this->pool->getLogger();
Expand Down
26 changes: 17 additions & 9 deletions src/Databags/TransactionConfiguration.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ final class TransactionConfiguration
public const DEFAULT_METADATA = '[]';

/**
* @param float|null $timeout timeout in seconds
* @param iterable<string, scalar|array|null>|null $metaData
* @param float|null $timeout timeout in seconds
* @param array<array-key, mixed>|null $metaData
*/
public function __construct(
private ?float $timeout = null,
private ?iterable $metaData = null,
private ?array $metaData = null,
) {
}

Expand All @@ -41,7 +41,7 @@ public function __construct(
*/
public static function create(?float $timeout = null, ?iterable $metaData = null): self
{
return new self($timeout, $metaData);
return new self($timeout, $metaData !== null ? (array) $metaData : null);
}

/**
Expand All @@ -53,11 +53,9 @@ public static function default(): self
}

/**
* Get the configured transaction metadata.
*
* @return iterable<string, scalar|array|null>|null
* @return array<string, scalar|array|null>|null
*/
public function getMetaData(): ?iterable
public function getMetaData(): ?array
{
return $this->metaData;
}
Expand All @@ -69,6 +67,15 @@ public function getTimeout(): ?float
{
return $this->timeout;
}
/**
* Get the configured bookmarks for causal consistency.
*
* @return array<string>|null
*/
public function getBookmarks(): ?array
{
return $this->bookmarks;
}

/**
* Creates a new transaction object with the provided timeout.
Expand All @@ -87,7 +94,7 @@ public function withTimeout(?float $timeout): self
*/
public function withMetaData(?iterable $metaData): self
{
return new self($this->timeout, $metaData);
return new self($this->timeout, $metaData !== null ? (array) $metaData : null);
}

/**
Expand All @@ -101,6 +108,7 @@ public function merge(?TransactionConfiguration $config): self

$metaData = $config->metaData;
if ($metaData !== null) {
/** @psalm-suppress PossiblyInvalidArgument */
$tsxConfig = $tsxConfig->withMetaData($metaData);
}
$timeout = $config->timeout;
Expand Down
7 changes: 6 additions & 1 deletion src/Formatter/Specialised/BoltOGMTranslator.php
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,9 @@ private function makeFromBoltRelationship(BoltRelationship $rel): Relationship
foreach ($rel->properties as $key => $property) {
$map[$key] = $this->mapValueToType($property);
}
/** @var string|null $elementId */
$startNodeElementId = null;
$endNodeElementId = null;

/** @var string|null $elementId */
$elementId = null;
Expand All @@ -191,7 +194,9 @@ private function makeFromBoltRelationship(BoltRelationship $rel): Relationship
$rel->endNodeId,
$rel->type,
new CypherMap($map),
$elementId
$elementId,
$startNodeElementId, // Add this parameter
$endNodeElementId
);
}

Expand Down
9 changes: 8 additions & 1 deletion src/Types/CypherList.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,19 @@ class CypherList implements CypherSequence, Iterator, ArrayAccess
* @use CypherSequenceTrait<TValue>
*/
use CypherSequenceTrait;
private ?int $qid = null;

/**
* @param iterable<mixed, TValue>|callable():Generator<mixed, TValue> $iterable
*
* @psalm-mutation-free
*/
public function __construct(iterable|callable $iterable = [])
public function __construct(iterable|callable $iterable = [], ?int $qid = null)
{
if (is_array($iterable)) {
$iterable = new ArrayIterator($iterable);
}
$this->qid = $qid;

$this->generator = static function () use ($iterable): Generator {
$i = 0;
Expand Down Expand Up @@ -425,4 +427,9 @@ public function each(callable $callable): self

return $this;
}

public function getQid(): ?int
{
return $this->qid;
}
}
12 changes: 12 additions & 0 deletions src/Types/Relationship.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
*/
final class Relationship extends UnboundRelationship
{
private string $startNodeElementId;
private string $endNodeElementId;

/**
* @param CypherMap<OGMTypes> $properties
*/
Expand All @@ -34,8 +37,17 @@ public function __construct(
string $type,
CypherMap $properties,
?string $elementId,
int|string|null $startNodeElementId = null,
int|string|null $endNodeElementId = null,
) {
parent::__construct($id, $type, $properties, $elementId);
$this->startNodeElementId = $startNodeElementId !== null
? (string) $startNodeElementId
: (string) $startNodeId;

$this->endNodeElementId = $endNodeElementId !== null
? (string) $endNodeElementId
: (string) $endNodeId;
}

/**
Expand Down
Loading
Loading