Skip to content

Commit 219999c

Browse files
Merge pull request #139 from neo4j-php/open-streams-counter
added track of open streams in transaction. updated server state hand…
2 parents f6d1ad8 + f442bcd commit 219999c

File tree

7 files changed

+75
-12
lines changed

7 files changed

+75
-12
lines changed

src/protocol/AProtocol.php

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,14 @@ abstract class AProtocol
2727

2828
public ServerState $serverState;
2929

30+
/**
31+
* Multiple RUN statements in transaction generates "streams" which are pulled or discarded
32+
* We are keeping track of open streams to keep valid Server State
33+
* @link https://www.neo4j.com/docs/bolt/current/bolt/message/#transaction
34+
* @var int
35+
*/
36+
protected int $openStreams = 0;
37+
3038
/**
3139
* @throws UnpackException
3240
* @throws PackException
@@ -141,8 +149,10 @@ public function getResponse(): Response
141149
foreach (($this->serverStateTransition ?? []) as $transition) {
142150
if ($transition[0] === $serverState && $transition[1] === $response->message && $transition[2] === $response->signature) {
143151
$this->serverState = $transition[3];
144-
if ($response->signature === Signature::SUCCESS && ($response->content['has_more'] ?? false))
145-
$this->serverState = ($serverState === ServerState::TX_READY || $serverState === ServerState::TX_STREAMING) ? ServerState::TX_STREAMING : ServerState::STREAMING;
152+
if (in_array($response->message, [Message::PULL, Message::DISCARD], true)
153+
&& $response->signature === Signature::SUCCESS
154+
&& (($response->content['has_more'] ?? false) || $this->openStreams))
155+
$this->serverState = $this->serverState === ServerState::TX_READY ? ServerState::TX_STREAMING : ServerState::STREAMING;
146156
if ($transition[3] === ServerState::DEFUNCT)
147157
$this->connection->disconnect();
148158
break;

src/protocol/v3/CommitMessage.php

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
namespace Bolt\protocol\v3;
44

55
use Bolt\enum\Message;
6-
use Bolt\protocol\{V3, V4, V4_1, V4_2, V4_3, V4_4, V5, V5_1, V5_2, V5_3, V5_4};
6+
use Bolt\protocol\{Response, V3, V4, V4_1, V4_2, V4_3, V4_4, V5, V5_1, V5_2, V5_3, V5_4};
77
use Bolt\error\BoltException;
88

99
trait CommitMessage
@@ -21,4 +21,16 @@ public function commit(): V3|V4|V4_1|V4_2|V4_3|V4_4|V5|V5_1|V5_2|V5_3|V5_4
2121
$this->pipelinedMessages[] = Message::COMMIT;
2222
return $this;
2323
}
24+
25+
/**
26+
* Read COMMIT response
27+
* @return iterable
28+
* @throws BoltException
29+
*/
30+
protected function _commit(): iterable
31+
{
32+
$this->openStreams = 0;
33+
$content = $this->read($signature);
34+
yield new Response(Message::COMMIT, $signature, $content);
35+
}
2436
}

src/protocol/v3/RollbackMessage.php

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
namespace Bolt\protocol\v3;
44

55
use Bolt\enum\Message;
6-
use Bolt\protocol\{V3, V4, V4_1, V4_2, V4_3, V4_4, V5, V5_1, V5_2, V5_3, V5_4};
6+
use Bolt\protocol\{Response, V3, V4, V4_1, V4_2, V4_3, V4_4, V5, V5_1, V5_2, V5_3, V5_4};
77
use Bolt\error\BoltException;
88

99
trait RollbackMessage
@@ -21,4 +21,16 @@ public function rollback(): V3|V4|V4_1|V4_2|V4_3|V4_4|V5|V5_1|V5_2|V5_3|V5_4
2121
$this->pipelinedMessages[] = Message::ROLLBACK;
2222
return $this;
2323
}
24+
25+
/**
26+
* Read ROLLBACK response
27+
* @return iterable
28+
* @throws BoltException
29+
*/
30+
protected function _rollback(): iterable
31+
{
32+
$this->openStreams = 0;
33+
$content = $this->read($signature);
34+
yield new Response(Message::ROLLBACK, $signature, $content);
35+
}
2436
}

src/protocol/v3/RunMessage.php

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
namespace Bolt\protocol\v3;
44

55
use Bolt\enum\Message;
6-
use Bolt\protocol\{V3, V4, V4_1, V4_2, V4_3, V4_4, V5, V5_1, V5_2, V5_3, V5_4};
6+
use Bolt\protocol\{Response, V3, V4, V4_1, V4_2, V4_3, V4_4, V5, V5_1, V5_2, V5_3, V5_4};
77
use Bolt\error\BoltException;
88

99
trait RunMessage
@@ -26,4 +26,17 @@ public function run(string $query, array $parameters = [], array $extra = []): V
2626
$this->pipelinedMessages[] = Message::RUN;
2727
return $this;
2828
}
29+
30+
/**
31+
* Read RUN response
32+
* @return iterable
33+
* @throws BoltException
34+
*/
35+
protected function _run(): iterable
36+
{
37+
$content = $this->read($signature);
38+
if (array_key_exists('qid', $content))
39+
$this->openStreams++;
40+
yield new Response(Message::RUN, $signature, $content);
41+
}
2942
}

src/protocol/v4/DiscardMessage.php

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
namespace Bolt\protocol\v4;
44

55
use Bolt\enum\Message;
6-
use Bolt\protocol\{V4, V4_1, V4_2, V4_3, V4_4, V5, V5_1, V5_2, V5_3, V5_4};
6+
use Bolt\enum\Signature;
7+
use Bolt\protocol\{Response, V4, V4_1, V4_2, V4_3, V4_4, V5, V5_1, V5_2, V5_3, V5_4};
78
use Bolt\error\BoltException;
89

910
trait DiscardMessage
@@ -24,4 +25,17 @@ public function discard(array $extra = []): V4|V4_1|V4_2|V4_3|V4_4|V5|V5_1|V5_2|
2425
$this->pipelinedMessages[] = Message::DISCARD;
2526
return $this;
2627
}
28+
29+
/**
30+
* Read DISCARD response
31+
* @return iterable
32+
* @throws BoltException
33+
*/
34+
protected function _discard(): iterable
35+
{
36+
$content = $this->read($signature);
37+
if (!($content['has_more'] ?? false) && $this->openStreams)
38+
$this->openStreams = $signature === Signature::SUCCESS ? $this->openStreams - 1 : 0;
39+
yield new Response(Message::DISCARD, $signature, $content);
40+
}
2741
}

src/protocol/v4/PullMessage.php

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,20 @@ public function pull(array $extra = []): V4|V4_1|V4_2|V4_3|V4_4|V5|V5_1|V5_2|V5_
2626
}
2727

2828
/**
29-
* Read PULL response
30-
* @return array
29+
* Read PULL responses
30+
* @return iterable
3131
* @throws BoltException
3232
*/
3333
protected function _pull(): iterable
3434
{
3535
do {
3636
$content = $this->read($signature);
37+
if (!($content['has_more'] ?? false) && $this->openStreams) {
38+
if ($signature === Signature::SUCCESS)
39+
$this->openStreams--;
40+
elseif ($signature === Signature::FAILURE)
41+
$this->openStreams = 0;
42+
}
3743
yield new Response(Message::PULL, $signature, $content);
3844
} while ($signature == Signature::RECORD);
3945
}

src/protocol/v5_1/ServerStateTransition.php

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,8 @@ trait ServerStateTransition
2222
[ServerState::READY, Message::ROUTE, Signature::SUCCESS, ServerState::READY],
2323
[ServerState::READY, Message::RESET, Signature::SUCCESS, ServerState::READY],
2424
[ServerState::READY, Message::RESET, Signature::FAILURE, ServerState::DEFUNCT],
25-
[ServerState::STREAMING, Message::PULL, Signature::SUCCESS, ServerState::STREAMING],
2625
[ServerState::STREAMING, Message::PULL, Signature::SUCCESS, ServerState::READY],
2726
[ServerState::STREAMING, Message::PULL, Signature::FAILURE, ServerState::FAILED],
28-
[ServerState::STREAMING, Message::DISCARD, Signature::SUCCESS, ServerState::STREAMING],
2927
[ServerState::STREAMING, Message::DISCARD, Signature::SUCCESS, ServerState::READY],
3028
[ServerState::STREAMING, Message::DISCARD, Signature::FAILURE, ServerState::FAILED],
3129
[ServerState::STREAMING, Message::RESET, Signature::SUCCESS, ServerState::READY],
@@ -40,10 +38,8 @@ trait ServerStateTransition
4038
[ServerState::TX_READY, Message::RESET, Signature::FAILURE, ServerState::DEFUNCT],
4139
[ServerState::TX_STREAMING, Message::RUN, Signature::SUCCESS, ServerState::TX_STREAMING],
4240
[ServerState::TX_STREAMING, Message::RUN, Signature::FAILURE, ServerState::FAILED],
43-
[ServerState::TX_STREAMING, Message::PULL, Signature::SUCCESS, ServerState::TX_STREAMING],
4441
[ServerState::TX_STREAMING, Message::PULL, Signature::SUCCESS, ServerState::TX_READY],
4542
[ServerState::TX_STREAMING, Message::PULL, Signature::FAILURE, ServerState::FAILED],
46-
[ServerState::TX_STREAMING, Message::DISCARD, Signature::SUCCESS, ServerState::TX_STREAMING],
4743
[ServerState::TX_STREAMING, Message::DISCARD, Signature::SUCCESS, ServerState::TX_READY],
4844
[ServerState::TX_STREAMING, Message::DISCARD, Signature::FAILURE, ServerState::FAILED],
4945
[ServerState::TX_STREAMING, Message::RESET, Signature::SUCCESS, ServerState::READY],

0 commit comments

Comments
 (0)