Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 49 additions & 25 deletions src/connection/Socket.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,25 @@
class Socket extends AConnection
{
/**
* @var resource|\Socket|bool
* @var \Socket|bool
*/
private $socket = false;

private const POSSIBLE_TIMEOUTS_CODES = [11, 10060];
private const POSSIBLE_RETRY_CODES = [4, 10004];
private const POSSIBLE_RETRY_CODES = [
SOCKET_EINTR,
SOCKET_EWOULDBLOCK
];

public function connect(): bool
public function __construct(string $ip = '127.0.0.1', int $port = 7687, float $timeout = 15)
{
if (!extension_loaded('sockets')) {
throw new ConnectException('PHP Extension sockets not enabled');
}
parent::__construct($ip, $port, $timeout);
}

public function connect(): bool
{
$this->socket = @socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
if ($this->socket === false) {
throw new ConnectException('Cannot create socket');
Expand All @@ -42,10 +48,9 @@ public function connect(): bool
socket_set_option($this->socket, SOL_SOCKET, SO_KEEPALIVE, 1);
$this->configureTimeout();

$conn = @socket_connect($this->socket, $this->ip, $this->port);
if (!$conn) {
$code = socket_last_error($this->socket);
throw new ConnectException(socket_strerror($code), $code);
$start = microtime(true);
if (!@socket_connect($this->socket, $this->ip, $this->port)) {
$this->throwConnectException($start);
}

return true;
Expand All @@ -57,40 +62,54 @@ public function write(string $buffer): void
throw new ConnectException('Not initialized socket');
}

if (Bolt::$debug)
if (Bolt::$debug) {
$this->printHex($buffer);
}

$start = microtime(true);
$size = mb_strlen($buffer, '8bit');
while (0 < $size) {
$sent = @socket_write($this->socket, $buffer, $size);
if ($sent === false)
$this->throwConnectException();
if ($sent === false || $sent === 0) {
if (in_array(socket_last_error($this->socket), self::POSSIBLE_RETRY_CODES, true)) {
continue;
}
$this->throwConnectException($start);
}

$buffer = mb_strcut($buffer, $sent, null, '8bit');
$size -= $sent;
}
}

public function read(int $length = 2048): string
{
if ($this->socket === false)
if ($this->socket === false) {
throw new ConnectException('Not initialized socket');
}

$output = '';
$t = microtime(true);
$start = microtime(true);
do {
if (mb_strlen($output, '8bit') == 0 && $this->timeout > 0 && (microtime(true) - $t) >= $this->timeout)
throw new ConnectionTimeoutException('Read from connection reached timeout after ' . $this->timeout . ' seconds.');
$readed = @socket_read($this->socket, $length - mb_strlen($output, '8bit'));
if ($readed === false) {
if (in_array(socket_last_error($this->socket), self::POSSIBLE_RETRY_CODES, true))
if ($this->timeout > 0 && (microtime(true) - $start) >= $this->timeout) {
$this->throwConnectException($start);
}
$readed = '';
$result = @socket_recv($this->socket, $readed, $length - mb_strlen($output, '8bit'), 0);
if ($result === false) {
if (in_array(socket_last_error($this->socket), self::POSSIBLE_RETRY_CODES, true)) {
continue;
$this->throwConnectException();
}
$this->throwConnectException($start);
} elseif ($result === 0) {
throw new ConnectException('Connection closed by remote host');
}
$output .= $readed;
} while (mb_strlen($output, '8bit') < $length);

if (Bolt::$debug)
if (Bolt::$debug) {
$this->printHex($output, 'S: ');
}

return $output;
}
Expand All @@ -111,23 +130,28 @@ public function setTimeout(float $timeout): void

private function configureTimeout(): void
{
if ($this->socket === false)
if ($this->socket === false) {
return;
$timeoutSeconds = floor($this->timeout);
$microSeconds = floor(($this->timeout - $timeoutSeconds) * 1000000);
}
$timeoutSeconds = (int)floor($this->timeout);
$microSeconds = (int)floor(($this->timeout - $timeoutSeconds) * 1000000);
$timeoutOption = ['sec' => $timeoutSeconds, 'usec' => $microSeconds];
socket_set_option($this->socket, SOL_SOCKET, SO_RCVTIMEO, $timeoutOption);
socket_set_option($this->socket, SOL_SOCKET, SO_SNDTIMEO, $timeoutOption);
}

/**
* Throws an exception based on the last socket error or timeout.
* @param float|null $start
* @throws ConnectException
* @throws ConnectionTimeoutException
*/
private function throwConnectException(): void
private function throwConnectException(float|null $start = null): void
{
$code = socket_last_error($this->socket);
if (in_array($code, self::POSSIBLE_TIMEOUTS_CODES)) {
if ($code === SOCKET_ETIMEDOUT) {
throw new ConnectionTimeoutException('Connection timeout reached after ' . $this->timeout . ' seconds.');
} elseif ($start !== null && $this->timeout > 0 && (microtime(true) - $start) >= $this->timeout) {
throw new ConnectionTimeoutException('Connection timeout reached after ' . $this->timeout . ' seconds.');
} elseif ($code !== 0) {
throw new ConnectException(socket_strerror($code), $code);
Expand Down
6 changes: 3 additions & 3 deletions tests/BoltTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public function testSockets(): void
if (!extension_loaded('sockets'))
$this->markTestSkipped('Sockets extension not available');

$conn = new \Bolt\connection\Socket($GLOBALS['NEO_HOST'] ?? '127.0.0.1', $GLOBALS['NEO_PORT'] ?? 7687, 3);
$conn = new \Bolt\connection\Socket($GLOBALS['NEO_HOST'], $GLOBALS['NEO_PORT'], 3);
$this->assertInstanceOf(\Bolt\connection\Socket::class, $conn);

$bolt = new Bolt($conn);
Expand Down Expand Up @@ -58,7 +58,7 @@ public function testAura(): void

public function testHello(): AProtocol
{
$conn = new \Bolt\connection\StreamSocket($GLOBALS['NEO_HOST'] ?? '127.0.0.1', $GLOBALS['NEO_PORT'] ?? 7687);
$conn = new \Bolt\connection\StreamSocket($GLOBALS['NEO_HOST'], $GLOBALS['NEO_PORT']);
$this->assertInstanceOf(\Bolt\connection\StreamSocket::class, $conn);

$bolt = new Bolt($conn);
Expand Down Expand Up @@ -150,7 +150,7 @@ public function testRoute(AProtocol $protocol): void
if (version_compare($protocol->getVersion(), 4.3, '>=')) {
$response = $protocol
->route([
'address' => ($GLOBALS['NEO_HOST'] ?? '127.0.0.1') . ':' . ($GLOBALS['NEO_PORT'] ?? 7687)
'address' => $GLOBALS['NEO_HOST'] . ':' . $GLOBALS['NEO_PORT']
])
->getResponse();
$this->assertEquals(Signature::SUCCESS, $response->signature);
Expand Down
6 changes: 5 additions & 1 deletion tests/TestLayer.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,13 @@ public static function setUpBeforeClass(): void
$host = getenv('GDB_HOST');
if (!empty($host))
$GLOBALS['NEO_HOST'] = $host;
if (!isset($GLOBALS['NEO_HOST']))
$GLOBALS['NEO_HOST'] = '127.0.0.1';
$port = getenv('GDB_PORT');
if (!empty($port))
$GLOBALS['NEO_PORT'] = $port;
$GLOBALS['NEO_PORT'] = (int)$port;
if (!isset($GLOBALS['NEO_PORT']))
$GLOBALS['NEO_PORT'] = 7687;
}

/**
Expand Down
2 changes: 1 addition & 1 deletion tests/connection/ConnectionTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,6 @@ public function testTimeoutRecoverAndReset(string $alias): void

private function getConnection(string $class): IConnection
{
return new $class($GLOBALS['NEO_HOST'] ?? '127.0.0.1', (int)($GLOBALS['NEO_PORT'] ?? 7687), 1);
return new $class($GLOBALS['NEO_HOST'], $GLOBALS['NEO_PORT'], 1);
}
}
2 changes: 1 addition & 1 deletion tests/error/ErrorsTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public function testPackException1(): void

public function testPackException2(): void
{
$conn = new \Bolt\connection\StreamSocket($GLOBALS['NEO_HOST'] ?? '127.0.0.1', $GLOBALS['NEO_PORT'] ?? 7687);
$conn = new \Bolt\connection\StreamSocket($GLOBALS['NEO_HOST'], $GLOBALS['NEO_PORT']);
$this->assertInstanceOf(\Bolt\connection\StreamSocket::class, $conn);

$bolt = new \Bolt\Bolt($conn);
Expand Down
2 changes: 1 addition & 1 deletion tests/packstream/v1/BytesTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class BytesTest extends TestLayer
{
public function testInit(): AProtocol
{
$conn = new \Bolt\connection\StreamSocket($GLOBALS['NEO_HOST'] ?? '127.0.0.1', $GLOBALS['NEO_PORT'] ?? 7687);
$conn = new \Bolt\connection\StreamSocket($GLOBALS['NEO_HOST'], $GLOBALS['NEO_PORT']);
$this->assertInstanceOf(\Bolt\connection\StreamSocket::class, $conn);

$bolt = new Bolt($conn);
Expand Down
2 changes: 1 addition & 1 deletion tests/packstream/v1/PackerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class PackerTest extends TestLayer
{
public function testInit(): AProtocol
{
$conn = new \Bolt\connection\StreamSocket($GLOBALS['NEO_HOST'] ?? '127.0.0.1', $GLOBALS['NEO_PORT'] ?? 7687);
$conn = new \Bolt\connection\StreamSocket($GLOBALS['NEO_HOST'], $GLOBALS['NEO_PORT']);
$this->assertInstanceOf(\Bolt\connection\StreamSocket::class, $conn);

$bolt = new Bolt($conn);
Expand Down
2 changes: 1 addition & 1 deletion tests/packstream/v1/UnpackerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class UnpackerTest extends TestLayer
{
public function testInit(): AProtocol
{
$conn = new \Bolt\connection\StreamSocket($GLOBALS['NEO_HOST'] ?? '127.0.0.1', $GLOBALS['NEO_PORT'] ?? 7687);
$conn = new \Bolt\connection\StreamSocket($GLOBALS['NEO_HOST'], $GLOBALS['NEO_PORT']);
$this->assertInstanceOf(\Bolt\connection\StreamSocket::class, $conn);

$bolt = new Bolt($conn);
Expand Down
2 changes: 1 addition & 1 deletion tests/structures/V6/StructuresTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class StructuresTest extends \Bolt\tests\structures\StructureLayer
{
public function testInit(): AProtocol
{
$conn = new \Bolt\connection\StreamSocket($GLOBALS['NEO_HOST'] ?? '127.0.0.1', $GLOBALS['NEO_PORT'] ?? 7687);
$conn = new \Bolt\connection\StreamSocket($GLOBALS['NEO_HOST'], $GLOBALS['NEO_PORT']);
$this->assertInstanceOf(\Bolt\connection\StreamSocket::class, $conn);

$bolt = new Bolt($conn);
Expand Down
2 changes: 1 addition & 1 deletion tests/structures/v1/StructuresTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class StructuresTest extends \Bolt\tests\structures\StructureLayer
{
public function testInit(): AProtocol|V4_4|V4_3|V4_2|V3
{
$conn = new \Bolt\connection\StreamSocket($GLOBALS['NEO_HOST'] ?? '127.0.0.1', $GLOBALS['NEO_PORT'] ?? 7687);
$conn = new \Bolt\connection\StreamSocket($GLOBALS['NEO_HOST'], $GLOBALS['NEO_PORT']);
$this->assertInstanceOf(\Bolt\connection\StreamSocket::class, $conn);

$bolt = new Bolt($conn);
Expand Down
2 changes: 1 addition & 1 deletion tests/structures/v4_3/StructuresTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class StructuresTest extends \Bolt\tests\structures\StructureLayer
{
public function testInit(): AProtocol|V4_4|V4_3
{
$conn = new \Bolt\connection\StreamSocket($GLOBALS['NEO_HOST'] ?? '127.0.0.1', $GLOBALS['NEO_PORT'] ?? 7687);
$conn = new \Bolt\connection\StreamSocket($GLOBALS['NEO_HOST'], $GLOBALS['NEO_PORT']);
$this->assertInstanceOf(\Bolt\connection\StreamSocket::class, $conn);

$bolt = new Bolt($conn);
Expand Down
6 changes: 1 addition & 5 deletions tests/structures/v5/StructuresTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,7 @@

use Bolt\Bolt;
use Bolt\protocol\AProtocol;
use Bolt\tests\structures\v1\DateTimeTrait;
use Bolt\tests\structures\v1\DateTimeZoneIdTrait;
use Bolt\protocol\v5\structures\{
DateTime,
DateTimeZoneId,
Node,
Relationship,
UnboundRelationship
Expand All @@ -26,7 +22,7 @@ class StructuresTest extends \Bolt\tests\structures\StructureLayer
{
public function testInit(): AProtocol
{
$conn = new \Bolt\connection\StreamSocket($GLOBALS['NEO_HOST'] ?? '127.0.0.1', $GLOBALS['NEO_PORT'] ?? 7687);
$conn = new \Bolt\connection\StreamSocket($GLOBALS['NEO_HOST'], $GLOBALS['NEO_PORT']);
$this->assertInstanceOf(\Bolt\connection\StreamSocket::class, $conn);

$bolt = new Bolt($conn);
Expand Down