Skip to content

Commit b4d30c6

Browse files
authored
Added execution time to pipeline report (#1380)
1 parent 31e927e commit b4d30c6

File tree

16 files changed

+250
-30
lines changed

16 files changed

+250
-30
lines changed

composer.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
"monolog/monolog": "^2.0||^3.0",
3232
"packaged/thrift": "^0.15.0",
3333
"php-http/discovery": "^1.0",
34+
"psr/clock": "^1.0",
3435
"psr/http-client": "^1.0",
3536
"psr/http-message": "^1.0 || ^2.0",
3637
"psr/log": "^2.0 || ^3.0",

composer.lock

Lines changed: 51 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/core/etl/composer.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
"php": "~8.2.0 || ~8.3.0 || ~8.4.0",
1313
"ext-json": "*",
1414
"ext-mbstring": "*",
15+
"psr/clock": "^1.0",
1516
"flow-php/array-dot": "^0.10.0 || 1.x-dev",
1617
"flow-php/rdsl": "^0.10.0 || 1.x-dev",
1718
"flow-php/filesystem": "^0.10.0 || 1.x-dev",
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Flow\Clock;
6+
7+
use Psr\Clock\ClockInterface;
8+
9+
final class FakeClock implements ClockInterface
10+
{
11+
public function __construct(private \DateTimeImmutable $dateTime = new \DateTimeImmutable('now'))
12+
{
13+
}
14+
15+
public function modify(string $modify) : void
16+
{
17+
$this->dateTime = $this->dateTime->modify($modify);
18+
}
19+
20+
public function now() : \DateTimeImmutable
21+
{
22+
return $this->dateTime;
23+
}
24+
25+
public function set(\DateTimeImmutable $dateTime) : void
26+
{
27+
$this->dateTime = $dateTime;
28+
}
29+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Flow\Clock;
6+
7+
use Psr\Clock\ClockInterface;
8+
9+
final readonly class SystemClock implements ClockInterface
10+
{
11+
public function __construct(private \DateTimeZone $timezone)
12+
{
13+
}
14+
15+
public static function system() : self
16+
{
17+
return new self(new \DateTimeZone(date_default_timezone_get()));
18+
}
19+
20+
public static function utc() : self
21+
{
22+
return new self(new \DateTimeZone('UTC'));
23+
}
24+
25+
public function now() : \DateTimeImmutable
26+
{
27+
return new \DateTimeImmutable('now', $this->timezone);
28+
}
29+
}

src/core/etl/src/Flow/ETL/Config.php

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
use Flow\ETL\Row\EntryFactory;
1414
use Flow\Filesystem\{FilesystemTable};
1515
use Flow\Serializer\Serializer;
16+
use Psr\Clock\ClockInterface;
1617

1718
/**
1819
* Immutable configuration that can be used to initialize many contexts.
@@ -33,6 +34,7 @@
3334
public function __construct(
3435
private string $id,
3536
private Serializer $serializer,
37+
private ClockInterface $clock,
3638
private FilesystemTable $filesystemTable,
3739
private FilesystemStreams $filesystemStreams,
3840
private Optimizer $optimizer,
@@ -59,6 +61,11 @@ public function caster() : Caster
5961
return $this->caster;
6062
}
6163

64+
public function clock() : ClockInterface
65+
{
66+
return $this->clock;
67+
}
68+
6269
public function entryFactory() : EntryFactory
6370
{
6471
return $this->entryFactory;

src/core/etl/src/Flow/ETL/Config/ConfigBuilder.php

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
namespace Flow\ETL\Config;
66

77
use function Flow\Filesystem\DSL\fstab;
8+
use Flow\Clock\SystemClock;
89
use Flow\ETL\Config\Cache\CacheConfigBuilder;
910
use Flow\ETL\Config\Sort\SortConfigBuilder;
1011
use Flow\ETL\Filesystem\FilesystemStreams;
@@ -15,6 +16,7 @@
1516
use Flow\ETL\{Cache, Config, NativePHPRandomValueGenerator, RandomValueGenerator};
1617
use Flow\Filesystem\{Filesystem, FilesystemTable};
1718
use Flow\Serializer\{Base64Serializer, NativePHPSerializer, Serializer};
19+
use Psr\Clock\ClockInterface;
1820

1921
final class ConfigBuilder
2022
{
@@ -24,6 +26,8 @@ final class ConfigBuilder
2426

2527
private ?Caster $caster;
2628

29+
private ?ClockInterface $clock;
30+
2731
private ?FilesystemTable $fstab;
2832

2933
private ?string $id;
@@ -44,6 +48,7 @@ public function __construct()
4448
$this->putInputIntoRows = false;
4549
$this->optimizer = null;
4650
$this->caster = null;
51+
$this->clock = null;
4752
$this->cache = new CacheConfigBuilder();
4853
$this->sort = new SortConfigBuilder();
4954
$this->randomValueGenerator = new NativePHPRandomValueGenerator();
@@ -54,7 +59,7 @@ public function build() : Config
5459
$this->id ??= 'flow_php' . $this->randomValueGenerator->string(32);
5560
$entryFactory = new NativeEntryFactory();
5661
$this->serializer ??= new Base64Serializer(new NativePHPSerializer());
57-
62+
$this->clock ??= SystemClock::utc();
5863
$this->optimizer ??= new Optimizer(
5964
new Optimizer\LimitOptimization(),
6065
new Optimizer\BatchSizeOptimization(batchSize: 1000)
@@ -65,6 +70,7 @@ public function build() : Config
6570
return new Config(
6671
$this->id,
6772
$this->serializer,
73+
$this->clock,
6874
$this->fstab(),
6975
new FilesystemStreams($this->fstab()),
7076
$this->optimizer,
@@ -83,6 +89,13 @@ public function cache(Cache $cache) : self
8389
return $this;
8490
}
8591

92+
public function clock(ClockInterface $clocks) : self
93+
{
94+
$this->clock = $clocks;
95+
96+
return $this;
97+
}
98+
8699
public function dontPutInputIntoRows() : self
87100
{
88101
$this->putInputIntoRows = false;

src/core/etl/src/Flow/ETL/DataFrame.php

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -745,7 +745,7 @@ public function rows(Transformer|Transformation $transformer) : self
745745
/**
746746
* @trigger
747747
*
748-
* @param null|callable(Rows $rows): void $callback
748+
* @param null|callable(Rows $rows, FlowContext $context): void $callback
749749
* @param bool $analyze - when set to true, run will return Report
750750
*/
751751
#[DSLMethod(exclude: true)]
@@ -756,9 +756,13 @@ public function run(?callable $callback = null, bool $analyze = false) : ?Report
756756
$totalRows = 0;
757757
$schema = new Schema();
758758

759+
if ($analyze) {
760+
$startedAt = $this->context->config->clock()->now();
761+
}
762+
759763
foreach ($clone->pipeline->process($clone->context) as $rows) {
760764
if ($callback !== null) {
761-
$callback($rows);
765+
$callback($rows, $clone->context);
762766
}
763767

764768
if ($analyze) {
@@ -768,7 +772,9 @@ public function run(?callable $callback = null, bool $analyze = false) : ?Report
768772
}
769773

770774
if ($analyze) {
771-
return new Report($schema, new Statistics($totalRows));
775+
$endedAt = $this->context->config->clock()->now();
776+
777+
return new Report($schema, new Statistics($totalRows, new Statistics\ExecutionTime($startedAt, $endedAt)));
772778
}
773779

774780
return null;

src/core/etl/src/Flow/ETL/Dataset/Statistics.php

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,13 @@
44

55
namespace Flow\ETL\Dataset;
66

7+
use Flow\ETL\Dataset\Statistics\ExecutionTime;
8+
79
final readonly class Statistics
810
{
911
public function __construct(
1012
private int $totalRows,
13+
public readonly ExecutionTime $executionTime,
1114
) {
1215
}
1316

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Flow\ETL\Dataset\Statistics;
6+
7+
use Flow\ETL\Exception\InvalidArgumentException;
8+
9+
final readonly class ExecutionTime
10+
{
11+
public function __construct(public \DateTimeImmutable $startedAt, public \DateTimeImmutable $finishedAt)
12+
{
13+
if ($startedAt > $finishedAt) {
14+
throw new InvalidArgumentException('Execution start date must be before finish date');
15+
}
16+
}
17+
18+
public function duration() : \DateInterval
19+
{
20+
return $this->startedAt->diff($this->finishedAt);
21+
}
22+
23+
public function inSeconds() : int
24+
{
25+
return $this->finishedAt->getTimestamp() - $this->startedAt->getTimestamp();
26+
}
27+
}

0 commit comments

Comments
 (0)