diff --git a/src/Io/LazyConnectionPool.php b/src/Io/LazyConnectionPool.php new file mode 100644 index 0000000..3f3c713 --- /dev/null +++ b/src/Io/LazyConnectionPool.php @@ -0,0 +1,104 @@ +connectionSelector = $connectionSelector; + $this->poolSize = $poolSize; + for ($i = 0; $i < $poolSize; $i++) { + $this->pool[$i] = $connection = $factory->createLazyConnection($connectionURI); + $this->requestCounter[$i] = 0; + Util::forwardEvents($connection, $this, ['error', 'close']); + } + } + + /** + * set the internal pool-pointer to the next valid connection on depending on the connectionSelector + * @return int + */ + protected function shiftPoolPointer(): int + { + switch ($this->connectionSelector) { + case self::CS_ROUND_ROBIN: + $this->poolPointer = ($this->poolPointer + 1) % $this->poolSize; + break; + case self::CS_BY_LOAD: + $rcList = $this->requestCounter; // copy + asort($rcList, SORT_NUMERIC); + $this->poolPointer = key($rcList); + break; + } + return $this->poolPointer; + } + + /** + * @param callable $callback received an ConnectionInterface as parameter + * @return mixed + */ + protected function pooledCallback(callable $callback) + { + $pointer = $this->shiftPoolPointer(); + $this->requestCounter[$pointer]++; + $connection = $this->pool[$pointer]; + return $callback($connection)->then(function ($result) use ($pointer) { + $this->requestCounter[$pointer]--; + return $result; + }); + } + + public function query($sql, array $params = array()): \React\Promise\PromiseInterface + { + return $this->pooledCallback(function (ConnectionInterface $connection) use ($sql, $params) { + return $connection->query($sql, $params); + }); + } + + public function queryStream($sql, $params = array()): \React\Stream\ReadableStreamInterface + { + return $this->pooledCallback(function (ConnectionInterface $connection) use ($sql, $params) { + return $connection->queryStream($sql, $params); + }); + } + + public function ping(): \React\Promise\PromiseInterface + { + return $this->pooledCallback(function (ConnectionInterface $connection) { + return $connection->ping(); + }); + } + + public function quit(): \React\Promise\PromiseInterface + { + return resolve(array_map(function ($connection) { + $connection->quit(); + return $connection; + }, $this->pool)); + } + + public function close(): \React\Promise\PromiseInterface + { + return resolve(array_map(function ($connection) { + $connection->close(); + return $connection; + }, $this->pool)); + } +}