Skip to content

Commit 21ce0ec

Browse files
committed
feature: flow postgresql adapter - loaders
1 parent 6542d44 commit 21ce0ec

File tree

28 files changed

+1657
-92
lines changed

28 files changed

+1657
-92
lines changed

.github/workflows/job-benchmark-tests.yml

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,17 @@ jobs:
1414
operating-system:
1515
- "ubuntu-latest"
1616

17+
services:
18+
postgres:
19+
image: postgres:18-alpine
20+
env:
21+
POSTGRES_USER: postgres
22+
POSTGRES_PASSWORD: postgres
23+
POSTGRES_DB: postgres
24+
ports:
25+
- 5432/tcp
26+
options: --health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5
27+
1728
steps:
1829
- name: "Checkout"
1930
uses: "actions/checkout@v5"
@@ -24,8 +35,10 @@ jobs:
2435
php-version: "${{ matrix.php-version }}"
2536
dependencies: "locked"
2637
coverage: "none"
27-
extensions: ':psr, bcmath, dom, hash, json, mbstring, xml, xmlwriter, xmlreader, zlib, snappy-https://github.com/kjdev/[email protected]'
38+
extensions: ':psr, bcmath, dom, hash, json, mbstring, xml, xmlwriter, xmlreader, zlib, pgsql, snappy-https://github.com/kjdev/[email protected]'
2839
php-env: '{"SNAPPY_CONFIGURE_PREFIX_OPTS": "CXXFLAGS=-std=c++11"}'
40+
apt-packages: "build-essential autoconf automake libtool protobuf-compiler libprotobuf-c-dev"
41+
pie-extensions: "flow-php/pg-query-ext:1.x-dev"
2942
cache-key-suffix: "-locked"
3043

3144
- name: "Download phpbench benchmarks artifact"
@@ -35,6 +48,10 @@ jobs:
3548
name: phpbench-baseline
3649
path: ./var/phpbench/
3750

51+
- name: "Generate phpbench.json with dynamic port"
52+
run: |
53+
echo '{"runner.php_env": {"PGSQL_DATABASE_URL": "pgsql://postgres:[email protected]:${{ job.services.postgres.ports[5432] }}/postgres?serverVersion=11&charset=utf8"}}' > phpbench.json
54+
3855
- name: "Execute benchmarks"
3956
id: init_comment
4057
run: |

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,5 @@ var
44
vendor
55
compose.yml
66
phpunit.xml
7+
phpbench.json
78
.env

documentation/components/adapters/postgresql.md

Lines changed: 156 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,12 @@
88

99
[TOC]
1010

11-
Flow PHP's Adapter PostgreSQL is designed to seamlessly integrate PostgreSQL within your ETL (Extract, Transform, Load) workflows. This adapter is built on top of the [PostgreSQL library](/documentation/components/libs/postgresql/client-connection.md), providing efficient data extraction capabilities with built-in pagination support. By harnessing the Adapter PostgreSQL library, developers can tap into robust features for precise database interaction, simplifying complex data transformations and enhancing data processing efficiency.
11+
Flow PHP's Adapter PostgreSQL is designed to seamlessly integrate PostgreSQL within your ETL (Extract, Transform, Load)
12+
workflows. This adapter is built on top of
13+
the [PostgreSQL library](/documentation/components/libs/postgresql/client-connection.md), providing efficient data
14+
extraction and loading capabilities. By harnessing the Adapter PostgreSQL library, developers can tap into robust
15+
features for precise database interaction, simplifying complex data transformations and enhancing data processing
16+
efficiency.
1217

1318
## Installation
1419

@@ -24,20 +29,35 @@ composer require flow-php/etl-adapter-postgresql:~--FLOW_PHP_VERSION--
2429

2530
## Description
2631

27-
This adapter provides two extraction strategies optimized for different use cases:
32+
This adapter provides:
33+
34+
### Extractors
35+
36+
Two extraction strategies optimized for different use cases:
2837

2938
- **LIMIT/OFFSET Pagination**: Simple pagination suitable for smaller datasets
3039
- **Keyset (Cursor) Pagination**: Efficient pagination for large datasets with consistent performance
3140

3241
Both extractors support:
42+
3343
- Raw SQL strings or Query Builder objects
3444
- Configurable page sizes
3545
- Maximum row limits
3646
- Custom schema definitions
3747

48+
### Loader
49+
50+
A flexible loader supporting:
51+
52+
- **INSERT**: Simple inserts with batch support
53+
- **UPDATE**: Update existing rows by primary key
54+
- **DELETE**: Delete rows by primary key
55+
- **UPSERT**: ON CONFLICT handling for insert-or-update operations
56+
3857
## Extractor - LIMIT/OFFSET Pagination
3958

40-
The `from_pgsql_limit_offset` extractor uses traditional LIMIT/OFFSET pagination. This is simple to use but may have performance degradation on very large datasets with high offsets.
59+
The `from_pgsql_limit_offset` extractor uses traditional LIMIT/OFFSET pagination. This is simple to use but may have
60+
performance degradation on very large datasets with high offsets.
4161

4262
### Basic Usage
4363

@@ -93,7 +113,11 @@ data_frame()
93113

94114
## Extractor - Keyset (Cursor) Pagination
95115

96-
The `from_pgsql_key_set` extractor uses keyset pagination (also known as cursor-based pagination). This provides consistent performance regardless of how deep you paginate, making it ideal for large datasets.
116+
The `from_pgsql_key_set` extractor uses keyset pagination (also known as cursor-based pagination). This provides
117+
consistent performance regardless of how deep you paginate, making it ideal for large datasets.
118+
119+
> **Note:** The ORDER BY clause is automatically generated from the keyset configuration. You only need to define
120+
> the sort order once using `pgsql_pagination_key_asc()` or `pgsql_pagination_key_desc()`.
97121
98122
### Basic Usage
99123

@@ -169,44 +193,143 @@ data_frame()
169193

170194
### Extractor Functions
171195

172-
| Function | Description |
173-
|----------|-------------|
174-
| `from_pgsql_limit_offset($client, $query, $pageSize, $maximum)` | Extract using LIMIT/OFFSET pagination |
175-
| `from_pgsql_key_set($client, $query, $keySet, $pageSize, $maximum)` | Extract using keyset pagination |
196+
| Function | Description |
197+
|---------------------------------------------------------------------|---------------------------------------|
198+
| `from_pgsql_limit_offset($client, $query, $pageSize, $maximum)` | Extract using LIMIT/OFFSET pagination |
199+
| `from_pgsql_key_set($client, $query, $keySet, $pageSize, $maximum)` | Extract using keyset pagination |
176200

177201
### Key Functions
178202

179-
| Function | Description |
180-
|----------|-------------|
181-
| `pgsql_pagination_key_asc($column)` | Create an ascending key for keyset pagination |
203+
| Function | Description |
204+
|--------------------------------------|-----------------------------------------------|
205+
| `pgsql_pagination_key_asc($column)` | Create an ascending key for keyset pagination |
182206
| `pgsql_pagination_key_desc($column)` | Create a descending key for keyset pagination |
183-
| `pgsql_pagination_key_set(...$keys)` | Create a keyset from one or more keys |
207+
| `pgsql_pagination_key_set(...$keys)` | Create a keyset from one or more keys |
208+
209+
## Loader
210+
211+
The `to_pgsql_table` loader writes data to PostgreSQL tables. It supports INSERT, UPDATE, and DELETE operations with
212+
configurable conflict handling.
213+
214+
### Basic Insert
215+
216+
```php
217+
use function Flow\ETL\Adapter\PostgreSql\to_pgsql_table;
218+
use function Flow\ETL\DSL\{df, from_array};
219+
use function Flow\PostgreSql\DSL\{pgsql_client, pgsql_connection_dsn};
220+
221+
$client = pgsql_client(pgsql_connection_dsn('pgsql://user:pass@localhost:5432/database'));
222+
223+
df()
224+
->read(from_array([
225+
['id' => 1, 'name' => 'Alice', 'email' => '[email protected]'],
226+
['id' => 2, 'name' => 'Bob', 'email' => '[email protected]'],
227+
]))
228+
->write(to_pgsql_table($client, 'users'))
229+
->run();
230+
```
231+
232+
### Insert with Skip Conflicts (ON CONFLICT DO NOTHING)
233+
234+
Skip rows that would cause a constraint violation:
235+
236+
```php
237+
use function Flow\ETL\Adapter\PostgreSql\{pgsql_insert_options, to_pgsql_table};
238+
239+
df()
240+
->read(from_array($data))
241+
->write(
242+
to_pgsql_table($client, 'users')
243+
->withInsertOptions(pgsql_insert_options(skipConflicts: true))
244+
)
245+
->run();
246+
```
184247

185-
## Choosing Between Extractors
248+
### Upsert (ON CONFLICT DO UPDATE)
186249

187-
### Use LIMIT/OFFSET when:
188-
- Working with smaller datasets (< 100k rows)
189-
- You need simple, straightforward pagination
190-
- Random page access is required
191-
- The offset values remain relatively small
250+
Update existing rows on conflict using specific columns:
192251

193-
### Use Keyset Pagination when:
194-
- Working with large datasets (100k+ rows)
195-
- Performance consistency is critical
196-
- You're doing sequential/forward pagination
197-
- Your table has suitable indexed columns for the keyset
252+
```php
253+
use function Flow\ETL\Adapter\PostgreSql\{pgsql_insert_options, to_pgsql_table};
254+
255+
df()
256+
->read(from_array($data))
257+
->write(
258+
to_pgsql_table($client, 'users')
259+
->withInsertOptions(pgsql_insert_options(
260+
conflictColumns: ['email'], // Detect conflicts on these columns
261+
updateColumns: ['name', 'updated_at'] // Update these columns on conflict
262+
))
263+
)
264+
->run();
265+
```
198266

199-
## Performance Considerations
267+
### Upsert on Constraint
268+
269+
Use a named constraint for conflict detection:
270+
271+
```php
272+
use function Flow\ETL\Adapter\PostgreSql\{pgsql_insert_options, to_pgsql_table};
273+
274+
df()
275+
->read(from_array($data))
276+
->write(
277+
to_pgsql_table($client, 'users')
278+
->withInsertOptions(pgsql_insert_options(
279+
conflictConstraint: 'users_email_key',
280+
updateColumns: ['name']
281+
))
282+
)
283+
->run();
284+
```
200285

201-
### LIMIT/OFFSET
286+
### Update Existing Rows
202287

203-
- Simple to understand and implement
204-
- Performance degrades as offset increases (PostgreSQL must skip all previous rows)
205-
- Memory usage increases with larger offsets
288+
Update rows matching primary key values:
289+
290+
```php
291+
use function Flow\ETL\Adapter\PostgreSql\{pgsql_update_options, to_pgsql_table};
292+
use Flow\ETL\Adapter\PostgreSql\Operation;
293+
294+
df()
295+
->read(from_array([
296+
['id' => 1, 'name' => 'Alice Updated', 'email' => '[email protected]'],
297+
['id' => 2, 'name' => 'Bob Updated', 'email' => '[email protected]'],
298+
]))
299+
->write(
300+
to_pgsql_table($client, 'users')
301+
->withOperation(Operation::UPDATE)
302+
->withUpdateOptions(pgsql_update_options(['id'])) // Match on 'id' column
303+
)
304+
->run();
305+
```
306+
307+
### Delete Rows
308+
309+
Delete rows matching primary key values:
310+
311+
```php
312+
use function Flow\ETL\Adapter\PostgreSql\{pgsql_delete_options, to_pgsql_table};
313+
use Flow\ETL\Adapter\PostgreSql\Operation;
314+
315+
df()
316+
->read(from_array([
317+
['id' => 1],
318+
['id' => 3],
319+
]))
320+
->write(
321+
to_pgsql_table($client, 'users')
322+
->withOperation(Operation::DELETE)
323+
->withDeleteOptions(pgsql_delete_options(['id']))
324+
)
325+
->run();
326+
```
206327

207-
### Keyset Pagination
328+
## Loader DSL Functions Reference
208329

209-
- Consistent O(1) performance regardless of position
210-
- Requires indexed columns in the keyset
211-
- Cannot jump to arbitrary pages (sequential access only)
212-
- Handles concurrent modifications more gracefully
330+
| Function | Description |
331+
|--------------------------------------|-------------------------------------------------|
332+
| `to_pgsql_table($client, $table)` | Create a PostgreSQL loader for a table |
333+
| `pgsql_insert_options(...)` | Configure insert behavior (conflicts, upsert) |
334+
| `pgsql_update_options($primaryKeys)` | Configure update behavior (primary key columns) |
335+
| `pgsql_delete_options($primaryKeys)` | Configure delete behavior (primary key columns) |

phpbench.json renamed to phpbench.json.dist

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,20 @@
1717
},
1818
"runner.path": [
1919
"src/adapter/etl-adapter-csv/tests/Flow/ETL/Adapter/CSV/Tests/Benchmark/",
20+
"src/adapter/etl-adapter-doctrine/tests/Flow/ETL/Adapter/Doctrine/Tests/Benchmark/",
2021
"src/adapter/etl-adapter-excel/tests/Flow/ETL/Adapter/Excel/Tests/Benchmark/",
2122
"src/adapter/etl-adapter-json/tests/Flow/ETL/Adapter/JSON/Tests/Benchmark/",
2223
"src/adapter/etl-adapter-parquet/tests/Flow/ETL/Adapter/Parquet/Tests/Benchmark/",
24+
"src/adapter/etl-adapter-postgresql/tests/Flow/ETL/Adapter/PostgreSql/Tests/Benchmark/",
2325
"src/adapter/etl-adapter-text/tests/Flow/ETL/Adapter/Text/Tests/Benchmark/",
2426
"src/adapter/etl-adapter-xml/tests/Flow/ETL/Adapter/XML/Tests/Benchmark/",
2527
"src/core/etl/tests/Flow/ETL/Tests/Benchmark/",
2628
"src/lib/parquet/tests/Flow/Parquet/Tests/Benchmark/"
2729
],
2830
"runner.php_config": { "memory_limit": "1G" },
31+
"runner.php_env": {
32+
"PGSQL_DATABASE_URL": "pgsql://postgres:[email protected]:5432/postgres?serverVersion=11&charset=utf8"
33+
},
2934
"runner.iterations": 3,
3035
"runner.retry_threshold": 5,
3136
"storage.xml_storage_path": "var/phpbench"

0 commit comments

Comments
 (0)