Skip to content

Commit f8aac34

Browse files
committed
feature: cursor based postgresql adapter
- added dsl and builder to cover cursors to postgresql library - added cursor based extractor
1 parent 048bc15 commit f8aac34

File tree

18 files changed

+970
-97
lines changed

18 files changed

+970
-97
lines changed

documentation/components/adapters/postgresql.md

Lines changed: 72 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,16 @@ This adapter provides:
3333

3434
### Extractors
3535

36-
Two extraction strategies optimized for different use cases:
36+
Three extraction strategies optimized for different use cases:
3737

38+
- **Server-Side Cursor**: True streaming extraction using PostgreSQL DECLARE CURSOR for maximum memory efficiency
3839
- **LIMIT/OFFSET Pagination**: Simple pagination suitable for smaller datasets
3940
- **Keyset (Cursor) Pagination**: Efficient pagination for large datasets with consistent performance
4041

41-
Both extractors support:
42+
All extractors support:
4243

4344
- Raw SQL strings or Query Builder objects
44-
- Configurable page sizes
45+
- Configurable batch/page sizes
4546
- Maximum row limits
4647
- Custom schema definitions
4748

@@ -54,6 +55,73 @@ A flexible loader supporting:
5455
- **DELETE**: Delete rows by primary key
5556
- **UPSERT**: ON CONFLICT handling for insert-or-update operations
5657

58+
## Extractor - Server-Side Cursor
59+
60+
The `from_pgsql_cursor` extractor uses PostgreSQL's native server-side cursors via `DECLARE CURSOR` + `FETCH`.
61+
This is the **only way** to achieve true low-memory streaming with PHP's ext-pgsql, as the extension
62+
has no unbuffered query mode.
63+
64+
> **Note:** This extractor automatically manages transactions. Cursors require a transaction context,
65+
> which is auto-started if not already in one.
66+
67+
### Basic Usage
68+
69+
```php
70+
use function Flow\ETL\Adapter\PostgreSql\from_pgsql_cursor;
71+
use function Flow\PostgreSql\DSL\{pgsql_client, pgsql_connection_dsn};
72+
73+
$client = pgsql_client(pgsql_connection_dsn('pgsql://user:pass@localhost:5432/database'));
74+
75+
data_frame()
76+
->read(from_pgsql_cursor(
77+
$client,
78+
"SELECT id, name, email FROM users",
79+
fetchSize: 1000
80+
))
81+
->write(to_output())
82+
->run();
83+
```
84+
85+
### With Query Builder
86+
87+
```php
88+
use function Flow\ETL\Adapter\PostgreSql\from_pgsql_cursor;
89+
use function Flow\PostgreSql\DSL\{col, select, star, table};
90+
91+
data_frame()
92+
->read(from_pgsql_cursor(
93+
$client,
94+
select(star())->from(table('large_table')),
95+
fetchSize: 500
96+
))
97+
->write(to_output())
98+
->run();
99+
```
100+
101+
### With Parameters
102+
103+
```php
104+
use function Flow\ETL\Adapter\PostgreSql\from_pgsql_cursor;
105+
106+
data_frame()
107+
->read(from_pgsql_cursor(
108+
$client,
109+
"SELECT * FROM orders WHERE status = $1 AND created_at > $2",
110+
parameters: ['pending', '2024-01-01'],
111+
fetchSize: 1000
112+
))
113+
->write(to_output())
114+
->run();
115+
```
116+
117+
### When to Use Each Extractor
118+
119+
| Extractor | Best For | Memory | ORDER BY Required |
120+
|---------------------------|-------------------------------------|------------------------|-------------------|
121+
| `from_pgsql_cursor` | Very large datasets, true streaming | Lowest (server-side) | No |
122+
| `from_pgsql_key_set` | Large datasets with indexed keys | Medium (page buffered) | Auto-generated |
123+
| `from_pgsql_limit_offset` | Small-medium datasets | Medium (page buffered) | Yes |
124+
57125
## Extractor - LIMIT/OFFSET Pagination
58126

59127
The `from_pgsql_limit_offset` extractor uses traditional LIMIT/OFFSET pagination. This is simple to use but may have
@@ -195,6 +263,7 @@ data_frame()
195263

196264
| Function | Description |
197265
|---------------------------------------------------------------------|---------------------------------------|
266+
| `from_pgsql_cursor($client, $query, $parameters, $fetchSize, $max)` | Extract using server-side cursor |
198267
| `from_pgsql_limit_offset($client, $query, $pageSize, $maximum)` | Extract using LIMIT/OFFSET pagination |
199268
| `from_pgsql_key_set($client, $query, $keySet, $pageSize, $maximum)` | Extract using keyset pagination |
200269

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Flow\ETL\Adapter\PostgreSql;
6+
7+
use function Flow\ETL\DSL\array_to_rows;
8+
use function Flow\PostgreSql\DSL\{close_cursor, declare_cursor, fetch};
9+
use Flow\ETL\Exception\InvalidArgumentException;
10+
use Flow\ETL\Extractor\Signal;
11+
use Flow\ETL\{Extractor, FlowContext, Schema};
12+
use Flow\PostgreSql\Client\Client;
13+
use Flow\PostgreSql\QueryBuilder\SqlQuery;
14+
15+
/**
16+
* PostgreSQL extractor using server-side cursors for memory-efficient extraction.
17+
*
18+
* Uses DECLARE CURSOR + FETCH to stream data from PostgreSQL without loading
19+
* the entire result set into memory. This is the only way to achieve true
20+
* low memory extraction with PHP's ext-pgsql.
21+
*
22+
* Note: Requires a transaction context (auto-started if not in one).
23+
*/
24+
final class PostgreSqlCursorExtractor implements Extractor
25+
{
26+
private ?string $cursorName = null;
27+
28+
private int $fetchSize = 1000;
29+
30+
private ?int $maximum = null;
31+
32+
private ?Schema $schema = null;
33+
34+
/**
35+
* @param array<int, mixed> $parameters
36+
*/
37+
public function __construct(
38+
private readonly Client $client,
39+
private readonly string|SqlQuery $query,
40+
private readonly array $parameters = [],
41+
) {
42+
}
43+
44+
public function extract(FlowContext $context) : \Generator
45+
{
46+
$cursorName = $this->cursorName ?? 'flow_cursor_' . \bin2hex(\random_bytes(8));
47+
48+
$ownTransaction = $this->client->getTransactionNestingLevel() === 0;
49+
50+
if ($ownTransaction) {
51+
$this->client->beginTransaction();
52+
}
53+
54+
try {
55+
$this->client->execute(
56+
declare_cursor($cursorName, $this->query),
57+
$this->parameters
58+
);
59+
60+
$totalFetched = 0;
61+
62+
while (true) {
63+
$cursor = $this->client->cursor(fetch($cursorName)->forward($this->fetchSize));
64+
$hasRows = false;
65+
66+
foreach ($cursor->iterate() as $row) {
67+
$hasRows = true;
68+
$signal = yield array_to_rows($row, $context->entryFactory(), [], $this->schema);
69+
70+
if ($signal === Signal::STOP) {
71+
$cursor->free();
72+
73+
return;
74+
}
75+
76+
$totalFetched++;
77+
78+
if ($this->maximum !== null && $totalFetched >= $this->maximum) {
79+
$cursor->free();
80+
81+
return;
82+
}
83+
}
84+
85+
$cursor->free();
86+
87+
if (!$hasRows) {
88+
break;
89+
}
90+
}
91+
} finally {
92+
$this->client->execute(close_cursor($cursorName));
93+
94+
if ($ownTransaction) {
95+
$this->client->commit();
96+
}
97+
}
98+
}
99+
100+
public function withCursorName(string $cursorName) : self
101+
{
102+
$this->cursorName = $cursorName;
103+
104+
return $this;
105+
}
106+
107+
public function withFetchSize(int $fetchSize) : self
108+
{
109+
if ($fetchSize <= 0) {
110+
throw new InvalidArgumentException('Fetch size must be greater than 0, got ' . $fetchSize);
111+
}
112+
113+
$this->fetchSize = $fetchSize;
114+
115+
return $this;
116+
}
117+
118+
public function withMaximum(int $maximum) : self
119+
{
120+
if ($maximum <= 0) {
121+
throw new InvalidArgumentException('Maximum must be greater than 0, got ' . $maximum);
122+
}
123+
124+
$this->maximum = $maximum;
125+
126+
return $this;
127+
}
128+
129+
public function withSchema(Schema $schema) : self
130+
{
131+
$this->schema = $schema;
132+
133+
return $this;
134+
}
135+
}

src/adapter/etl-adapter-postgresql/src/Flow/ETL/Adapter/PostgreSql/functions.php

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,38 @@
1010
use Flow\PostgreSql\Client\Client;
1111
use Flow\PostgreSql\QueryBuilder\SqlQuery;
1212

13+
/**
14+
* Create a PostgreSQL cursor extractor using server-side cursors for memory-efficient extraction.
15+
*
16+
* Uses DECLARE CURSOR + FETCH to stream data without loading entire result set into memory.
17+
* This is the only way to achieve true low memory extraction with PHP's ext-pgsql.
18+
*
19+
* Note: Requires a transaction context (auto-started if not in one).
20+
*
21+
* @param Client $client PostgreSQL client
22+
* @param SqlQuery|string $query SQL query to execute (wrapped in DECLARE CURSOR)
23+
* @param array<int, mixed> $parameters Positional parameters for the query
24+
* @param int $fetchSize Number of rows to fetch per batch (default: 1000)
25+
* @param null|int $maximum Maximum number of rows to extract (null for unlimited)
26+
*/
27+
#[DocumentationDSL(module: Module::POSTGRESQL, type: DSLType::EXTRACTOR)]
28+
function from_pgsql_cursor(
29+
Client $client,
30+
string|SqlQuery $query,
31+
array $parameters = [],
32+
int $fetchSize = 1000,
33+
?int $maximum = null,
34+
) : PostgreSqlCursorExtractor {
35+
$extractor = (new PostgreSqlCursorExtractor($client, $query, $parameters))
36+
->withFetchSize($fetchSize);
37+
38+
if ($maximum !== null) {
39+
$extractor->withMaximum($maximum);
40+
}
41+
42+
return $extractor;
43+
}
44+
1345
#[DocumentationDSL(module: Module::POSTGRESQL, type: DSLType::EXTRACTOR)]
1446
function from_pgsql_limit_offset(
1547
Client $client,

src/adapter/etl-adapter-postgresql/tests/Flow/ETL/Adapter/PostgreSql/Tests/Benchmark/PostgreSqlExtractorBench.php

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
namespace Flow\ETL\Adapter\PostgreSql\Tests\Benchmark;
66

7-
use function Flow\ETL\Adapter\PostgreSql\{from_pgsql_key_set, from_pgsql_limit_offset, pgsql_pagination_key_asc, pgsql_pagination_key_set, to_pgsql_table};
7+
use function Flow\ETL\Adapter\PostgreSql\{from_pgsql_cursor, from_pgsql_key_set, from_pgsql_limit_offset, pgsql_pagination_key_asc, pgsql_pagination_key_set, to_pgsql_table};
88
use function Flow\ETL\DSL\{config, df, flow_context};
99
use function Flow\PostgreSql\DSL\{asc, col, column, create, data_type_double_precision, data_type_integer, data_type_jsonb, data_type_text, data_type_timestamptz, data_type_uuid, drop, pgsql_client, pgsql_connection_dsn, pgsql_mapper, select, star, table};
1010
use Flow\ETL\Tests\Double\FakeStaticOrdersExtractor;
@@ -50,6 +50,18 @@ public function __destruct()
5050
$this->client->close();
5151
}
5252

53+
public function bench_extract_10k_cursor() : void
54+
{
55+
$context = flow_context(config());
56+
57+
foreach (from_pgsql_cursor(
58+
$this->client,
59+
select(star())->from(table(self::TABLE_NAME)),
60+
fetchSize: 1000
61+
)->extract($context) as $rows) {
62+
}
63+
}
64+
5365
public function bench_extract_10k_keyset() : void
5466
{
5567
$context = flow_context(config());

0 commit comments

Comments
 (0)