Skip to content

Feat/tesktkit session run #268

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion src/Bolt/BoltConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ public function close(): void
try {
if ($this->isOpen()) {
if ($this->isStreaming()) {
$this->consumeResults();
$this->discardUnconsumedResults();
}

$message = $this->messageFactory->createGoodbyeMessage();
Expand Down Expand Up @@ -405,4 +405,30 @@ private function assertNoFailure(Response $response): void
throw Neo4jException::fromBoltResponse($response);
}
}

/**
* Discard unconsumed results - sends DISCARD to server for each subscribed result.
*/
public function discardUnconsumedResults(): void
{
$this->logger?->log(LogLevel::DEBUG, 'Discarding unconsumed results');

$this->subscribedResults = array_values(array_filter(
$this->subscribedResults,
static fn (WeakReference $ref): bool => $ref->get() !== null
));

if (!empty($this->subscribedResults)) {
try {
$this->discard(null);
$this->logger?->log(LogLevel::DEBUG, 'Sent DISCARD ALL for unconsumed results');
} catch (Throwable $e) {
$this->logger?->log(LogLevel::ERROR, 'Failed to discard results', [
'exception' => $e->getMessage(),
]);
}
}

$this->subscribedResults = [];
}
}
11 changes: 11 additions & 0 deletions src/Bolt/Session.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
*/
final class Session implements SessionInterface
{
/** @var list<BoltConnection> */
private array $usedConnections = [];
/** @psalm-readonly */
private readonly BookmarkHolder $bookmarkHolder;

Expand Down Expand Up @@ -178,6 +180,7 @@ private function acquireConnection(TransactionConfiguration $config, SessionConf
$timeout = ($timeout < 30) ? 30 : $timeout;
$connection->setTimeout($timeout + 2);
}
$this->usedConnections[] = $connection;

return $connection;
}
Expand Down Expand Up @@ -217,6 +220,14 @@ public function getLastBookmark(): Bookmark
return $this->bookmarkHolder->getBookmark();
}

public function close(): void
{
foreach ($this->usedConnections as $connection) {
$connection->discardUnconsumedResults();
}
$this->usedConnections = [];
}

private function getLogger(): ?Neo4jLogger
{
return $this->pool->getLogger();
Expand Down
7 changes: 6 additions & 1 deletion src/Formatter/Specialised/BoltOGMTranslator.php
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,9 @@ private function makeFromBoltRelationship(BoltRelationship $rel): Relationship
foreach ($rel->properties as $key => $property) {
$map[$key] = $this->mapValueToType($property);
}
/** @var string|null $elementId */
$startNodeElementId = null;
$endNodeElementId = null;

/** @var string|null $elementId */
$elementId = null;
Expand All @@ -191,7 +194,9 @@ private function makeFromBoltRelationship(BoltRelationship $rel): Relationship
$rel->endNodeId,
$rel->type,
new CypherMap($map),
$elementId
$elementId,
$startNodeElementId, // Add this parameter
$endNodeElementId
);
}

Expand Down
9 changes: 8 additions & 1 deletion src/Types/CypherList.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,19 @@ class CypherList implements CypherSequence, Iterator, ArrayAccess
* @use CypherSequenceTrait<TValue>
*/
use CypherSequenceTrait;
private ?int $qid = null;

/**
* @param iterable<mixed, TValue>|callable():Generator<mixed, TValue> $iterable
*
* @psalm-mutation-free
*/
public function __construct(iterable|callable $iterable = [])
public function __construct(iterable|callable $iterable = [], ?int $qid = null)
{
if (is_array($iterable)) {
$iterable = new ArrayIterator($iterable);
}
$this->qid = $qid;

$this->generator = static function () use ($iterable): Generator {
$i = 0;
Expand Down Expand Up @@ -425,4 +427,9 @@ public function each(callable $callable): self

return $this;
}

public function getQid(): ?int
{
return $this->qid;
}
}
12 changes: 12 additions & 0 deletions src/Types/Relationship.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
*/
final class Relationship extends UnboundRelationship
{
private string $startNodeElementId;
private string $endNodeElementId;

/**
* @param CypherMap<OGMTypes> $properties
*/
Expand All @@ -34,8 +37,17 @@ public function __construct(
string $type,
CypherMap $properties,
?string $elementId,
int|string|null $startNodeElementId = null,
int|string|null $endNodeElementId = null,
) {
parent::__construct($id, $type, $properties, $elementId);
$this->startNodeElementId = $startNodeElementId !== null
? (string) $startNodeElementId
: (string) $startNodeId;

$this->endNodeElementId = $endNodeElementId !== null
? (string) $endNodeElementId
: (string) $endNodeId;
}

/**
Expand Down
14 changes: 12 additions & 2 deletions src/Types/UnboundRelationship.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,15 @@ class UnboundRelationship extends AbstractPropertyObject
/**
* @param CypherMap<OGMTypes> $properties
*/
private string $elementId;

public function __construct(
private readonly int $id,
private readonly string $type,
private readonly CypherMap $properties,
private readonly ?string $elementId,
?string $elementId = null,
) {
$this->elementId = $elementId ?? (string) $id;
}

public function getElementId(): ?string
Expand All @@ -55,6 +58,9 @@ public function getType(): string
return $this->type;
}

/**
* @psalm-suppress MixedReturnTypeCoercion
*/
public function getProperties(): CypherMap
{
/** @psalm-suppress InvalidReturnStatement false positive with type alias. */
Expand All @@ -80,7 +86,11 @@ public function toArray(): array
*
* @return OGMTypes
*/
public function getProperty(string $key)

/**
* @psalm-suppress MixedReturnStatement
*/
public function getProperty(string $key): string
{
/** @psalm-suppress ImpureMethodCall */
if (!$this->properties->hasKey($key)) {
Expand Down
Loading