Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions phpstan.neon
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ parameters:
level: 8
treatPhpDocTypesAsCertain: false
ignoreErrors:
- identifier: argument.type
- identifier: missingType.iterableValue
- identifier: missingType.generics
paths:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public function extract(FlowContext $context) : \Generator
continue;
}

/** @var array<mixed> $rowData */
/** @var non-empty-list<null|string> $rowData */
$rowData = \str_getcsv($csvLine, $separator, $enclosure, $escape);

if (!\count($headers)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public function extract(FlowContext $context) : \Generator
} else {
$countQuery = (clone $this->queryBuilder)->select('COUNT(*)');

// @phpstan-ignore-next-line
if (\method_exists($countQuery, 'resetOrderBy')) {
$countQuery->resetOrderBy();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,15 @@ public function queryParamName() : string
return $this->queryParamName;
}

public function toQueryParam(Rows $rows) : mixed
/**
* @return array<array-key, null|bool|float|int|string>
*/
public function toQueryParam(Rows $rows) : array
{
return $rows->reduceToArray($this->ref);
}

public function type() : int|ArrayParameterType|null
public function type() : int|ArrayParameterType
{
return $this->type;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
final class ElasticsearchExtractor implements Extractor
{
/**
* @phpstan-ignore-next-line
*
* @psalm-suppress UndefinedClass
*
* @phpstan-ignore-next-line
*/
private \Elasticsearch\Client|\Elastic\Elasticsearch\Client|null $client;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@ public function __construct(private LoggerInterface $logger, private string $mes

public function load(Rows $rows, FlowContext $context) : void
{
/**
* @psalm-var callable(Row) : void $loader
*/
$loader = function (Row $row) : void {
$this->logger->log($this->logLevel, $this->message, $row->toArray());
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,8 @@ final class CacheConfigBuilder

public function build(FilesystemTable $fstab, Serializer $serializer) : CacheConfig
{
$cachePath = \is_string(\getenv(CacheConfig::CACHE_DIR_ENV)) && \getenv(CacheConfig::CACHE_DIR_ENV) !== ''
? \getenv(CacheConfig::CACHE_DIR_ENV)
: \sys_get_temp_dir() . '/flow_php/cache';

if (!\is_string($cachePath)) {
throw new RuntimeException('Cache directory must be a string, got ' . \gettype($cachePath));
}
$cachePath = \getenv(CacheConfig::CACHE_DIR_ENV) ?: '';
$cachePath = $cachePath !== '' ? $cachePath : \sys_get_temp_dir() . '/flow_php/cache';

if (!\file_exists($cachePath)) {
if (!mkdir($cachePath, 0777, true) && !is_dir($cachePath)) {
Expand Down
3 changes: 1 addition & 2 deletions src/core/etl/src/Flow/ETL/DataFrame.php
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,14 @@ public static function fromArray(array $definition) : self
}

/**
* @throws \JsonException
* @throws InvalidArgumentException
*/
public static function fromJson(string $json) : self
{
try {
return self::fromArray(\json_decode($json, true, 512, JSON_THROW_ON_ERROR));
} catch (\JsonException $exception) {
throw new InvalidFileFormatException('json', 'unknown');
throw new InvalidFileFormatException('json', 'unknown', $exception);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@

namespace Flow\ETL\Extractor;

use Flow\ETL\Extractor;

/**
* Limitable extractor is one that can be limited to extract only given number of rows.
* Whenever limit is set in a pipeline before any transformations, LogicalPlan processor will try
* to grab that limit and inject it directly to the extractor to avoid unnecessary processing.
*/
interface LimitableExtractor
interface LimitableExtractor extends Extractor
{
public function changeLimit(int $limit) : void;

Expand Down
2 changes: 1 addition & 1 deletion src/core/etl/src/Flow/ETL/Function/Split.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public function __construct(
) {
}

public function eval(Row $row) : array|string|null
public function eval(Row $row) : ?array
{
$value = (new Parameter($this->value))->asString($row);
$separator = (new Parameter($this->separator))->asString($row);
Expand Down
2 changes: 1 addition & 1 deletion src/core/etl/src/Flow/ETL/Function/XPath.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public function __construct(
) {
}

public function eval(Row $row) : \DOMNode|array|null
public function eval(Row $row) : ?array
{
$value = (new Parameter($this->value))->asInstanceOf($row, \DOMNode::class);
$path = (new Parameter($this->path))->asString($row);
Expand Down
2 changes: 1 addition & 1 deletion src/core/etl/src/Flow/ETL/Loader/ArrayLoader.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
final class ArrayLoader implements Loader
{
/**
* @param-out array<array<mixed>> $array
* @param array<array<mixed>> $array
*/
public function __construct(private array &$array)
{
Expand Down
4 changes: 2 additions & 2 deletions src/core/etl/src/Flow/ETL/Loader/CallbackLoader.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
final class CallbackLoader implements Loader
{
/**
* @phpstan-ignore-next-line
*
* @param callable(Rows $row, FlowContext $context) : void $callback
*
* @phpstan-ignore-next-line
*/
private $callback;

Expand Down
6 changes: 0 additions & 6 deletions src/core/etl/src/Flow/ETL/Pipeline/CollectingPipeline.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,13 @@

namespace Flow\ETL\Pipeline;

use Flow\ETL\Exception\InvalidArgumentException;
use Flow\ETL\{Extractor, FlowContext, Loader, Pipeline, Rows, Transformer};

/**
* @internal
*/
final class CollectingPipeline implements Pipeline
{
/**
* @param Pipeline $pipeline
*
* @throws InvalidArgumentException
*/
public function __construct(private readonly Pipeline $pipeline)
{
}
Expand Down
6 changes: 1 addition & 5 deletions src/core/etl/src/Flow/ETL/Row.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ public static function with(Entry ...$entries) : self

/**
* @throws InvalidArgumentException
*
* @return $this
*/
public function add(Entry ...$entries) : self
{
Expand Down Expand Up @@ -151,10 +149,8 @@ public function toArray(bool $withKeys = true) : array

/**
* @throws InvalidArgumentException
*
* @return mixed
*/
public function valueOf(string|Reference $name)
public function valueOf(string|Reference $name) : mixed
{
return $this->get($name)->value();
}
Expand Down
7 changes: 1 addition & 6 deletions src/core/etl/src/Flow/ETL/Row/Entries.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ public function __construct(Entry ...$entries)

/**
* @throws InvalidArgumentException
*
* @return $this
*/
public function add(Entry ...$entries) : self
{
Expand Down Expand Up @@ -222,7 +220,7 @@ public function offsetSet(mixed $offset, mixed $value) : void
/**
* @param array-key $offset
*
* @throws InvalidArgumentException
* @throws RuntimeException
*/
public function offsetUnset(mixed $offset) : void
{
Expand Down Expand Up @@ -312,9 +310,6 @@ public function rename(string|Reference $currentName, string|Reference $newName)
return self::recreate($entries);
}

/**
* @return $this
*/
public function set(Entry ...$entries) : self
{
$newEntries = $this->entries;
Expand Down
4 changes: 0 additions & 4 deletions src/core/etl/src/Flow/ETL/Row/Schema/Metadata.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ public static function fromArray(array $map) : self
/**
* @param string $key
* @param array<bool|float|int|string>|bool|float|int|string $value
*
* @return $this
*/
public static function with(string $key, int|string|bool|float|array $value) : self
{
Expand All @@ -52,8 +50,6 @@ public static function with(string $key, int|string|bool|float|array $value) : s
/**
* @param string $key
* @param array<bool|float|int|string>|bool|float|int|string $value
*
* @return $this
*/
public function add(string $key, int|string|bool|float|array $value) : self
{
Expand Down
6 changes: 0 additions & 6 deletions src/core/etl/src/Flow/ETL/Rows.php
Original file line number Diff line number Diff line change
Expand Up @@ -674,8 +674,6 @@ public function sort(callable $callback) : self

/**
* @throws InvalidArgumentException
*
* @return $this
*/
public function sortAscending(string|Reference $ref) : self
{
Expand All @@ -687,8 +685,6 @@ public function sortAscending(string|Reference $ref) : self

/**
* @throws InvalidArgumentException
*
* @return $this
*/
public function sortBy(Reference ...$refs) : self
{
Expand All @@ -703,8 +699,6 @@ public function sortBy(Reference ...$refs) : self

/**
* @throws InvalidArgumentException
*
* @return $this
*/
public function sortDescending(string|Reference $ref) : self
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,14 @@ final class CopyBlobOptions implements EndpointOptions

private ?int $timeoutSeconds = null;

private ?string $version = BlobService::VERSION;
private string $version = BlobService::VERSION;

private ?string $versionId = null;

public function toHeaders() : array
{
$headers = [];

if ($this->version !== null) {
$headers['x-ms-version'] = $this->version;
}

$headers['x-ms-version'] = $this->version;
$headers['User-Agent'] = $this->userAgentHeader();

if ($this->requestId !== null) {
Expand Down Expand Up @@ -82,6 +78,20 @@ public function toURIParameters() : array
return $uriParameters;
}

public function withDeleteSnapshots(DeleteSnapshots $deleteSnapshots) : self
{
$this->deleteSnapshots = $deleteSnapshots;

return $this;
}

public function withDeleteType(DeleteType $deleteType) : self
{
$this->deleteType = $deleteType;

return $this;
}

public function withLeaseId(string $leaseId) : self
{
$this->leaseId = $leaseId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,14 @@ final class CreateContainerOptions implements EndpointOptions

private ?int $timeoutSeconds = null;

private ?string $version = BlobService::VERSION;
private string $version = BlobService::VERSION;

public function toHeaders() : array
{
$headers = [];

$headers['user-agent'] = $this->userAgentHeader();

if ($this->version !== null) {
$headers['x-ms-version'] = $this->version;
}
$headers['x-ms-version'] = $this->version;

if ($this->requestId !== null) {
$headers['x-ms-client-request-id'] = $this->requestId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ final class DeleteBlobOptions implements EndpointOptions

private ?int $timeoutSeconds = null;

private ?string $version = BlobService::VERSION;
private string $version = BlobService::VERSION;

private ?string $versionId = null;

Expand All @@ -31,10 +31,7 @@ public function toHeaders() : array
$headers = [];

$headers['user-agent'] = $this->userAgentHeader();

if ($this->version !== null) {
$headers['x-ms-version'] = $this->version;
}
$headers['x-ms-version'] = $this->version;

if ($this->requestId !== null) {
$headers['x-ms-client-request-id'] = $this->requestId;
Expand Down Expand Up @@ -76,6 +73,20 @@ public function toURIParameters() : array
return $uriParameters;
}

public function withDeleteSnapshots(DeleteSnapshots $deleteSnapshots) : self
{
$this->deleteSnapshots = $deleteSnapshots;

return $this;
}

public function withDeleteType(DeleteType $deleteType) : self
{
$this->deleteType = $deleteType;

return $this;
}

public function withLeaseId(string $leaseId) : self
{
$this->leaseId = $leaseId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,14 @@ final class DeleteContainerOptions implements EndpointOptions

private ?int $timeoutSeconds = null;

private ?string $version = BlobService::VERSION;
private string $version = BlobService::VERSION;

public function toHeaders() : array
{
$headers = [];

$headers['user-agent'] = $this->userAgentHeader();

if ($this->version !== null) {
$headers['x-ms-version'] = $this->version;
}
$headers['x-ms-version'] = $this->version;

if ($this->requestId !== null) {
$headers['x-ms-client-request-id'] = $this->requestId;
Expand Down
Loading