Skip to content

Commit 1f7bdc8

Browse files
committed
added bookmark updates after every query consumption
1 parent 4bafd9a commit 1f7bdc8

11 files changed

+67
-23
lines changed

src/Basic/Session.php

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
namespace Laudis\Neo4j\Basic;
1313

1414
use Laudis\Neo4j\Contracts\SessionInterface;
15+
use Laudis\Neo4j\Databags\Bookmark;
1516
use Laudis\Neo4j\Databags\Statement;
1617
use Laudis\Neo4j\Databags\SummarizedResult;
1718
use Laudis\Neo4j\Databags\TransactionConfiguration;
@@ -92,4 +93,9 @@ public function transaction(callable $tsxHandler, ?TransactionConfiguration $con
9293
{
9394
return $this->session->writeTransaction($tsxHandler, $config);
9495
}
96+
97+
public function getLastBookmark(): Bookmark
98+
{
99+
return $this->session->getLastBookmark();
100+
}
95101
}

src/Bolt/BoltResult.php

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ final class BoltResult implements Iterator
3131
/** @var list<list> */
3232
private array $rows = [];
3333
private ?array $meta = null;
34-
/** @var (callable(array):void)|null */
35-
private $finishedCallback;
34+
/** @var list<(callable(array):void)> */
35+
private array $finishedCallbacks = [];
3636
private int $qid;
3737

3838
public function __construct(BoltConnection $connection, int $fetchSize, int $qid)
@@ -52,9 +52,9 @@ public function getFetchSize(): int
5252
/**
5353
* @param callable(array):void $finishedCallback
5454
*/
55-
public function setFinishedCallback(callable $finishedCallback): void
55+
public function addFinishedCallback(callable $finishedCallback): void
5656
{
57-
$this->finishedCallback = $finishedCallback;
57+
$this->finishedCallbacks[] = $finishedCallback;
5858
}
5959

6060
/**
@@ -83,8 +83,8 @@ public function iterator(): Generator
8383
}
8484
}
8585

86-
if ($this->finishedCallback) {
87-
call_user_func($this->finishedCallback, $this->meta);
86+
foreach ($this->finishedCallbacks as $finishedCallback) {
87+
$finishedCallback($this->meta);
8888
}
8989
}
9090

src/Bolt/BoltUnmanagedTransaction.php

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
use Laudis\Neo4j\Common\TransactionHelper;
1919
use Laudis\Neo4j\Contracts\FormatterInterface;
2020
use Laudis\Neo4j\Contracts\UnmanagedTransactionInterface;
21+
use Laudis\Neo4j\Databags\BookmarkHolder;
2122
use Laudis\Neo4j\Databags\SessionConfiguration;
2223
use Laudis\Neo4j\Databags\Statement;
2324
use Laudis\Neo4j\Databags\TransactionConfiguration;
@@ -55,17 +56,25 @@ final class BoltUnmanagedTransaction implements UnmanagedTransactionInterface
5556
private bool $isCommitted = false;
5657
private SessionConfiguration $config;
5758
private TransactionConfiguration $tsxConfig;
59+
private BookmarkHolder $bookmarkHolder;
5860

5961
/**
6062
* @param FormatterInterface<T> $formatter
6163
*/
62-
public function __construct(?string $database, FormatterInterface $formatter, BoltConnection $connection, SessionConfiguration $config, TransactionConfiguration $tsxConfig)
63-
{
64+
public function __construct(
65+
?string $database,
66+
FormatterInterface $formatter,
67+
BoltConnection $connection,
68+
SessionConfiguration $config,
69+
TransactionConfiguration $tsxConfig,
70+
BookmarkHolder $bookmarkHolder
71+
) {
6472
$this->formatter = $formatter;
6573
$this->connection = $connection;
6674
$this->database = $database;
6775
$this->config = $config;
6876
$this->tsxConfig = $tsxConfig;
77+
$this->bookmarkHolder = $bookmarkHolder;
6978
}
7079

7180
public function commit(iterable $statements = []): CypherList
@@ -138,7 +147,8 @@ public function runStatement(Statement $statement)
138147
$this->connection,
139148
$start,
140149
$run - $start,
141-
$statement
150+
$statement,
151+
$this->bookmarkHolder
142152
);
143153
}
144154

src/Bolt/Session.php

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
use Laudis\Neo4j\Contracts\SessionInterface;
2323
use Laudis\Neo4j\Contracts\TransactionInterface;
2424
use Laudis\Neo4j\Contracts\UnmanagedTransactionInterface;
25+
use Laudis\Neo4j\Databags\Bookmark;
26+
use Laudis\Neo4j\Databags\BookmarkHolder;
2527
use Laudis\Neo4j\Databags\SessionConfiguration;
2628
use Laudis\Neo4j\Databags\Statement;
2729
use Laudis\Neo4j\Databags\TransactionConfiguration;
@@ -58,6 +60,7 @@ final class Session implements SessionInterface
5860
private UriInterface $uri;
5961
/** @psalm-readonly */
6062
private AuthenticateInterface $auth;
63+
private BookmarkHolder $bookmarkHolder;
6164

6265
/**
6366
* @param FormatterInterface<ResultFormat> $formatter
@@ -77,6 +80,7 @@ public function __construct(
7780
$this->formatter = $formatter;
7881
$this->uri = $uri;
7982
$this->auth = $auth;
83+
$this->bookmarkHolder = new BookmarkHolder(Bookmark::from($config->getBookmarks()));
8084
}
8185

8286
public function runStatements(iterable $statements, ?TransactionConfiguration $config = null): CypherList
@@ -150,7 +154,7 @@ private function beginInstantTransaction(
150154
): TransactionInterface {
151155
$connection = $this->acquireConnection($tsxConfig, $config);
152156

153-
return new BoltUnmanagedTransaction($this->config->getDatabase(), $this->formatter, $connection, $this->config, $tsxConfig);
157+
return new BoltUnmanagedTransaction($this->config->getDatabase(), $this->formatter, $connection, $this->config, $tsxConfig, $this->bookmarkHolder);
154158
}
155159

156160
/**
@@ -184,11 +188,16 @@ private function startTransaction(TransactionConfiguration $config, SessionConfi
184188
throw Neo4jException::fromMessageException($e);
185189
}
186190

187-
return new BoltUnmanagedTransaction($this->config->getDatabase(), $this->formatter, $connection, $this->config, $config);
191+
return new BoltUnmanagedTransaction($this->config->getDatabase(), $this->formatter, $connection, $this->config, $config, $this->bookmarkHolder);
188192
}
189193

190194
private function mergeTsxConfig(?TransactionConfiguration $config): TransactionConfiguration
191195
{
192196
return TransactionConfiguration::default()->merge($config);
193197
}
198+
199+
public function getLastBookmark(): Bookmark
200+
{
201+
return $this->bookmarkHolder->getBookmark();
202+
}
194203
}

src/Contracts/FormatterInterface.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
use JsonException;
1818
use Laudis\Neo4j\Bolt\BoltConnection;
1919
use Laudis\Neo4j\Bolt\BoltResult;
20+
use Laudis\Neo4j\Databags\BookmarkHolder;
2021
use Laudis\Neo4j\Databags\Statement;
2122
use Laudis\Neo4j\Http\HttpConnection;
2223
use Laudis\Neo4j\Types\CypherList;
@@ -77,7 +78,7 @@ interface FormatterInterface
7778
*
7879
* @return ResultFormat
7980
*/
80-
public function formatBoltResult(array $meta, BoltResult $result, BoltConnection $connection, float $runStart, float $resultAvailableAfter, Statement $statement);
81+
public function formatBoltResult(array $meta, BoltResult $result, BoltConnection $connection, float $runStart, float $resultAvailableAfter, Statement $statement, BookmarkHolder $holder);
8182

8283
/**
8384
* Formats the results of the HTTP protocol to the unified format.

src/Contracts/SessionInterface.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,5 +97,5 @@ public function readTransaction(callable $tsxHandler, ?TransactionConfiguration
9797
*/
9898
public function transaction(callable $tsxHandler, ?TransactionConfiguration $config = null);
9999

100-
// public function getLastBookmark(): Bookmark;
100+
public function getLastBookmark(): Bookmark;
101101
}

src/Databags/BookmarkHolder.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@ final class BookmarkHolder
1717
{
1818
private Bookmark $bookmark;
1919

20-
public function __construct(?Bookmark $bookmark = null)
20+
public function __construct(Bookmark $bookmark = null)
2121
{
22-
$this->bookmark = $bookmark ?? new Bookmark();
22+
$this->bookmark = $bookmark;
2323
}
2424

2525
public function getBookmark(): Bookmark

src/Databags/SessionConfiguration.php

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ final class SessionConfiguration
4444
* @param string|null $database
4545
* @param pure-callable():(int|null)|int|null $fetchSize
4646
* @param pure-callable():(AccessMode|null)|AccessMode|null $defaultAccessMode
47-
* @param pure-callable():(iterable<string>|null)|iterable<string>|null $bookmarks
47+
* @param pure-callable():(iterable<Bookmark>|null)|iterable<Bookmark>|null $bookmarks
4848
*/
4949
public function __construct(
5050
$database = null,
@@ -152,6 +152,8 @@ public function getDatabase(): ?string
152152

153153
/**
154154
* Returns the initial bookmarks.
155+
*
156+
* @return iterable<Bookmark>
155157
*/
156158
public function getBookmarks(): iterable
157159
{

src/Formatter/OGMFormatter.php

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
use Laudis\Neo4j\Bolt\BoltResult;
1818
use Laudis\Neo4j\Contracts\ConnectionInterface;
1919
use Laudis\Neo4j\Contracts\FormatterInterface;
20+
use Laudis\Neo4j\Databags\Bookmark;
21+
use Laudis\Neo4j\Databags\BookmarkHolder;
2022
use Laudis\Neo4j\Databags\Statement;
2123
use Laudis\Neo4j\Formatter\Specialised\BoltOGMTranslator;
2224
use Laudis\Neo4j\Formatter\Specialised\JoltHttpOGMTranslator;
@@ -26,6 +28,7 @@
2628
use Psr\Http\Message\RequestInterface;
2729
use Psr\Http\Message\ResponseInterface;
2830
use stdClass;
31+
use function array_key_exists;
2932
use function version_compare;
3033

3134
/**
@@ -72,7 +75,7 @@ public static function create(): OGMFormatter
7275
*
7376
* @return CypherList<CypherMap<OGMTypes>>
7477
*/
75-
public function formatBoltResult(array $meta, BoltResult $result, BoltConnection $connection, float $runStart, float $resultAvailableAfter, Statement $statement): CypherList
78+
public function formatBoltResult(array $meta, BoltResult $result, BoltConnection $connection, float $runStart, float $resultAvailableAfter, Statement $statement, BookmarkHolder $holder): CypherList
7679
{
7780
$tbr = (new CypherList(function () use ($result, $meta) {
7881
foreach ($result as $row) {
@@ -81,6 +84,11 @@ public function formatBoltResult(array $meta, BoltResult $result, BoltConnection
8184
}))->withCacheLimit($result->getFetchSize());
8285

8386
$connection->subscribeResult($tbr);
87+
$result->addFinishedCallback(function (array $response) use ($holder) {
88+
if (array_key_exists('bookmark', $response) && is_string($response['bookmark'])) {
89+
$holder->setBookmark(new Bookmark([$response['bookmark']]));
90+
}
91+
});
8492

8593
return $tbr;
8694
}

src/Formatter/SummarizedResultFormatter.php

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313

1414
namespace Laudis\Neo4j\Formatter;
1515

16+
use Laudis\Neo4j\Databags\Bookmark;
17+
use Laudis\Neo4j\Databags\BookmarkHolder;
1618
use function in_array;
1719
use function is_int;
1820
use Laudis\Neo4j\Bolt\BoltConnection;
@@ -164,16 +166,16 @@ public function formatBoltStats(array $response): SummaryCounters
164166
);
165167
}
166168

167-
public function formatBoltResult(array $meta, BoltResult $result, BoltConnection $connection, float $runStart, float $resultAvailableAfter, Statement $statement): SummarizedResult
169+
public function formatBoltResult(array $meta, BoltResult $result, BoltConnection $connection, float $runStart, float $resultAvailableAfter, Statement $statement, BookmarkHolder $holder): SummarizedResult
168170
{
169171
/** @var ResultSummary|null $summary */
170172
$summary = null;
171-
$result->setFinishedCallback(function (array $counters) use ($connection, $statement, $runStart, $resultAvailableAfter, &$summary) {
172-
/** @var BoltCypherStats $counters */
173-
$stats = $this->formatBoltStats($counters);
173+
$result->addFinishedCallback(function (array $response) use ($connection, $statement, $runStart, $resultAvailableAfter, &$summary) {
174+
/** @var BoltCypherStats $response */
175+
$stats = $this->formatBoltStats($response);
174176
$resultConsumedAfter = microtime(true) - $runStart;
175177
/** @var string */
176-
$db = $counters['db'] ?? '';
178+
$db = $response['db'] ?? '';
177179
$summary = new ResultSummary(
178180
$stats,
179181
new DatabaseInfo($db),
@@ -192,7 +194,7 @@ public function formatBoltResult(array $meta, BoltResult $result, BoltConnection
192194
);
193195
});
194196

195-
$formattedResult = $this->formatter->formatBoltResult($meta, $result, $connection, $runStart, $resultAvailableAfter, $statement);
197+
$formattedResult = $this->formatter->formatBoltResult($meta, $result, $connection, $runStart, $resultAvailableAfter, $statement, $holder);
196198

197199
/**
198200
* @psalm-suppress MixedArgument

0 commit comments

Comments
 (0)