Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,87 @@ new class implements Transformation {
```

Above example will add a new column `time` to the dataset with the current timestamp.

## Collecting Telemetry After Streaming

When streaming large datasets, you may want to collect telemetry data such as total row counts, execution time,
or memory usage after the streaming completes. Flow provides a `StreamClosure` mechanism that gets called
with a `Report` containing statistics about the streamed data.

To receive a `Report`, you need to:
1. Configure `Analyze` in the `Config` using `config()` method
2. Set an `onComplete()` callback using `http_on_complete()` DSL function

```php
<?php

namespace Symfony\Application\Controller;

use Flow\Bridge\Symfony\HttpFoundation\Response\FlowStreamedResponse;
use Flow\ETL\Config;
use Flow\ETL\Dataset\Report;
use Psr\Log\LoggerInterface;
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
use Symfony\Component\Routing\Attribute\Route;
use function Flow\Bridge\Symfony\HttpFoundation\http_json_output;
use function Flow\Bridge\Symfony\HttpFoundation\http_on_complete;
use function Flow\Bridge\Symfony\HttpFoundation\http_stream_open;
use function Flow\ETL\Adapter\Parquet\from_parquet;
use function Flow\ETL\DSL\analyze;

final class ReportsController extends AbstractController
{
public function __construct(
private readonly LoggerInterface $logger,
) {}

#[Route('/report/stream', name: 'report_stream')]
public function streamReport() : FlowStreamedResponse
{
return http_stream_open(from_parquet(__DIR__ . '/reports/orders.parquet'))
->config(Config::builder()->analyze(analyze()))
->onComplete(http_on_complete(function (?Report $report) : void {
if ($report === null) {
return;
}

$this->logger->info('Stream completed', [
'total_rows' => $report->statistics()->totalRows(),
'execution_time_seconds' => $report->statistics()->executionTime->inSeconds(),
'memory_peak_mb' => $report->statistics()->memory->max()->inMb(),
]);
}))
->as('orders.json')
->streamedResponse(http_json_output());
}
}
```

### Analyze Options

The `analyze()` function supports additional options for collecting more detailed statistics:

- `analyze()` - Basic statistics (row count, execution time, memory usage)
- `analyze()->withSchema()` - Also collects schema information about the dataset
- `analyze()->withColumnStatistics()` - Also collects per-column statistics (min, max, null counts)
- `analyze()->withSchema()->withColumnStatistics()` - Collects all available statistics

```php
// Collect schema information along with basic statistics
->config(Config::builder()->analyze(analyze()->withSchema()))

// Collect everything
->config(Config::builder()->analyze(analyze()->withSchema()->withColumnStatistics()))
```

### Report Contents

The `Report` object provides access to:

- `$report->statistics()->totalRows()` - Total number of rows streamed
- `$report->statistics()->executionTime->inSeconds()` - Execution duration
- `$report->statistics()->executionTime->startedAt` - Start timestamp
- `$report->statistics()->executionTime->finishedAt` - End timestamp
- `$report->statistics()->memory->max()` - Peak memory usage
- `$report->schema()` - Dataset schema (when `withSchema()` is enabled)
- `$report->statistics()->columns` - Column statistics (when `withColumnStatistics()` is enabled)
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@
namespace Flow\Bridge\Symfony\HttpFoundation;

use Flow\Bridge\Symfony\HttpFoundation\Response\{FlowBufferedResponse, FlowStreamedResponse};
use Flow\ETL\{Extractor, Transformation, Transformations};
use Flow\ETL\Config\ConfigBuilder;
use Flow\ETL\{Config, Extractor, Transformation, Transformations};
use Symfony\Component\HttpFoundation\{HeaderUtils, Response};

/**
* FlowStreamedResponse builder.
*/
final class DataStream
{
private Config|ConfigBuilder|null $config = null;

/**
* @var array<string, string>
*/
Expand All @@ -24,6 +27,8 @@ final class DataStream

private int $status = Response::HTTP_OK;

private ?StreamClosure $streamClosure = null;

/**
* @var array<Transformation>
*/
Expand Down Expand Up @@ -53,6 +58,17 @@ public function as(string $name, bool $attachment = true) : self
return $this;
}

/**
* Set the Config for the DataFrame execution.
* Use this to configure Analyze for enabling Report generation.
*/
public function config(Config|ConfigBuilder $config) : self
{
$this->config = $config;

return $this;
}

/**
* Set additional headers.
* Headers are merged with the default headers.
Expand All @@ -66,6 +82,18 @@ public function headers(array $headers) : self
return $this;
}

/**
* Set a closure to be called after streaming completes.
* The closure receives the Report from DataFrame execution.
* Note: Report will be null unless Analyze is configured via config().
*/
public function onComplete(StreamClosure $streamClosure) : self
{
$this->streamClosure = $streamClosure;

return $this;
}

/**
* Create regular response where whole dataset is loaded into the memory.
* It's highly recommended to use limit transformation to avoid loading entire dataset into the memory.
Expand All @@ -80,7 +108,8 @@ public function response(Output $output) : FlowBufferedResponse
$output,
\count($this->transformations) ? new Transformations(...$this->transformations) : new Transformations(),
$this->status,
$this->headers
$this->headers,
$this->config,
);
}

Expand All @@ -99,15 +128,16 @@ public function status(int $status) : self
*/
public function streamedResponse(Output $output) : FlowStreamedResponse
{

$this->headers['Content-Type'] = $output->type()->toContentTypeHeader();

return new FlowStreamedResponse(
$this->extractor,
$output,
\count($this->transformations) ? new Transformations(...$this->transformations) : new Transformations(),
$this->status,
$this->headers
$this->headers,
$this->config,
$this->streamClosure,
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
namespace Flow\Bridge\Symfony\HttpFoundation\Response;

use function Flow\ETL\DSL\df;
use Flow\Bridge\Symfony\HttpFoundation\Output;
use Flow\Bridge\Symfony\HttpFoundation\{Output, StreamClosure};
use Flow\ETL\Config\ConfigBuilder;
use Flow\ETL\{Config, Extractor, Transformation};
use Flow\ETL\Transformations;
Expand All @@ -25,6 +25,7 @@ public function __construct(
int $status = 200,
array $headers = [],
Config|ConfigBuilder|null $config = null,
private readonly ?StreamClosure $streamClosure = null,
) {
$this->config = $config ?? Config::default();

Expand All @@ -37,11 +38,15 @@ public function __construct(

private function stream() : void
{
df($this->config)
$report = df($this->config)
->read($this->extractor)
->with($this->transformations)
->dropPartitions()
->write($this->output->stdoutLoader())
->run();

if ($this->streamClosure !== null) {
$this->streamClosure->onComplete($report);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<?php

declare(strict_types=1);

namespace Flow\Bridge\Symfony\HttpFoundation;

use Flow\ETL\Dataset\Report;

/**
* Interface for closures that are called after streaming completes.
* Implementations receive the Report from the DataFrame execution.
*/
interface StreamClosure
{
public function onComplete(?Report $report) : void;
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Flow\Bridge\Symfony\HttpFoundation;

use Flow\Bridge\Symfony\HttpFoundation\Output\{CSVOutput, JsonOutput, ParquetOutput, XMLOutput};
use Flow\ETL\Dataset\Report;
use Flow\ETL\Extractor;

function http_stream_open(Extractor $extractor) : DataStream
Expand All @@ -31,3 +32,24 @@ function http_parquet_output() : ParquetOutput
{
return new ParquetOutput();
}

/**
* Create a StreamClosure from a callable.
*
* @param callable(?Report): void $callback
*/
function http_on_complete(callable $callback) : StreamClosure
{
return new class($callback) implements StreamClosure {
public function __construct(
/** @var callable(?Report): void */
private readonly mixed $callback,
) {
}

public function onComplete(?Report $report) : void
{
($this->callback)($report);
}
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,83 @@

namespace Flow\Bridge\Symfony\HttpFoundation\Tests\Integration;

use function Flow\Bridge\Symfony\HttpFoundation\{http_csv_output, http_xml_output};
use function Flow\Bridge\Symfony\HttpFoundation\{http_csv_output, http_on_complete, http_xml_output};
use function Flow\ETL\Adapter\JSON\from_json;
use function Flow\ETL\DSL\from_array;
use function Flow\ETL\DSL\{analyze, from_array};
use Flow\Bridge\Symfony\HttpFoundation\{DataStream, Output\CSVOutput, Output\JsonOutput, Response\FlowStreamedResponse};
use Flow\ETL\Config;
use Flow\ETL\Dataset\Report;
use Flow\ETL\Tests\FlowTestCase;

final class FlowStreamedResponseTest extends FlowTestCase
{
public function test_stream_closure_is_called_after_streaming_with_report() : void
{
$closureCalled = false;
$receivedReport = null;

$response = DataStream::open(
from_array([
['id' => 1, 'name' => 'test'],
])
)
->config(Config::builder()->analyze(analyze()))
->onComplete(http_on_complete(function (?Report $report) use (&$closureCalled, &$receivedReport) : void {
$closureCalled = true;
$receivedReport = $report;
}))
->streamedResponse(new JsonOutput());

$this->sendResponse($response);

self::assertTrue($closureCalled);
self::assertNotNull($receivedReport);
self::assertSame(1, $receivedReport->statistics()->totalRows());
}

public function test_stream_closure_receives_null_without_analyze() : void
{
$receivedReport = 'not_null';

$response = DataStream::open(
from_array([
['id' => 1, 'name' => 'test'],
])
)
->onComplete(http_on_complete(function (?Report $report) use (&$receivedReport) : void {
$receivedReport = $report;
}))
->streamedResponse(new JsonOutput());

$this->sendResponse($response);

self::assertNull($receivedReport);
}

public function test_stream_closure_receives_report_with_schema_when_enabled() : void
{
$receivedReport = null;

$response = DataStream::open(
from_array([
['id' => 1, 'name' => 'test'],
['id' => 2, 'name' => 'test2'],
])
)
->config(Config::builder()->analyze(analyze()->withSchema()))
->onComplete(http_on_complete(function (?Report $report) use (&$receivedReport) : void {
$receivedReport = $report;
}))
->streamedResponse(new JsonOutput());

$this->sendResponse($response);

self::assertNotNull($receivedReport);
self::assertSame(2, $receivedReport->statistics()->totalRows());
self::assertNotNull($receivedReport->schema());
self::assertCount(2, $receivedReport->schema()->entries());
}

public function test_streaming_array_response_to_csv() : void
{
$response = new FlowStreamedResponse(
Expand Down
6 changes: 6 additions & 0 deletions src/core/etl/src/Flow/ETL/Config.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public function __construct(
private EntryFactory $entryFactory,
public CacheConfig $cache,
public SortConfig $sort,
private ?Analyze $analyze = null,
) {
}

Expand All @@ -54,6 +55,11 @@ public static function default() : self
return self::builder()->build();
}

public function analyze() : ?Analyze
{
return $this->analyze;
}

public function clock() : ClockInterface
{
return $this->clock;
Expand Down
Loading
Loading