Skip to content

Commit be1db54

Browse files
authored
Doctrine Dbal Transactional Loader (#1843)
* Doctrine Dbal Transactional Loader * Updated DSL definitions * Fixed tests failing at lowest version of dependencies * Updated landing page dependencies, fixed failing build
1 parent 4380d73 commit be1db54

File tree

22 files changed

+940
-163
lines changed

22 files changed

+940
-163
lines changed

composer.lock

Lines changed: 35 additions & 34 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/adapter/etl-adapter-doctrine/src/Flow/ETL/Adapter/Doctrine/DbalKeySetExtractor.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ public function extract(FlowContext $context) : \Generator
9797
$conditions[] = $qb->expr()->and(...$subConditions);
9898
}
9999

100-
if ($conditions) {
100+
if (\count($conditions) > 0) {
101101
$qb->andWhere($qb->expr()->or(...$conditions));
102102

103103
foreach ($parameters as $param => $value) {
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Flow\ETL\Adapter\Doctrine;
6+
7+
use Doctrine\DBAL\{Connection, DriverManager, TransactionIsolationLevel};
8+
use Flow\ETL\Exception\InvalidArgumentException;
9+
use Flow\ETL\{FlowContext, Loader, Rows};
10+
11+
final class TransactionalDbalLoader implements Loader
12+
{
13+
private ?Connection $connection = null;
14+
15+
private TransactionIsolationLevel|int|null $isolationLevel = null;
16+
17+
/**
18+
* @var array<DbalLoader>
19+
*/
20+
private readonly array $loaders;
21+
22+
/**
23+
* @param array<string, mixed> $connectionParams
24+
* @param DbalLoader ...$loaders
25+
*/
26+
public function __construct(
27+
private readonly array $connectionParams,
28+
DbalLoader ...$loaders,
29+
) {
30+
if (\count($loaders) === 0) {
31+
throw new InvalidArgumentException('At least one loader must be provided');
32+
}
33+
34+
$this->loaders = $loaders;
35+
}
36+
37+
/**
38+
* Since Connection::getParams() is marked as an internal method, please
39+
* use this constructor with caution.
40+
*/
41+
public static function fromConnection(
42+
Connection $connection,
43+
DbalLoader ...$loaders,
44+
) : self {
45+
$loader = new self($connection->getParams(), ...$loaders);
46+
$loader->connection = $connection;
47+
48+
return $loader;
49+
}
50+
51+
public function load(Rows $rows, FlowContext $context) : void
52+
{
53+
$this->executeInTransaction($this->connection(), $rows, $context);
54+
}
55+
56+
public function withIsolationLevel(TransactionIsolationLevel|int $level) : self
57+
{
58+
$this->isolationLevel = $level;
59+
60+
return $this;
61+
}
62+
63+
private function connection() : Connection
64+
{
65+
if ($this->connection === null) {
66+
/** @phpstan-ignore-next-line */
67+
$this->connection = DriverManager::getConnection($this->connectionParams);
68+
}
69+
70+
return $this->connection;
71+
}
72+
73+
private function executeInTransaction(Connection $connection, Rows $rows, FlowContext $context) : void
74+
{
75+
$previousIsolationLevel = null;
76+
77+
if ($this->isolationLevel !== null) {
78+
$previousIsolationLevel = $connection->getTransactionIsolation();
79+
/** @phpstan-ignore-next-line */
80+
$connection->setTransactionIsolation($this->isolationLevel);
81+
}
82+
83+
try {
84+
$connection->beginTransaction();
85+
86+
try {
87+
foreach ($this->loaders as $loader) {
88+
$loader->load($rows, $context);
89+
}
90+
91+
$connection->commit();
92+
} catch (\Throwable $e) {
93+
$connection->rollBack();
94+
95+
throw $e;
96+
}
97+
} finally {
98+
if ($previousIsolationLevel !== null) {
99+
$connection->setTransactionIsolation($previousIsolationLevel);
100+
}
101+
}
102+
}
103+
}

src/adapter/etl-adapter-doctrine/src/Flow/ETL/Adapter/Doctrine/functions.php

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
Attribute\DocumentationExample,
2525
Attribute\Module,
2626
Attribute\Type as DSLType,
27+
Loader,
2728
Schema};
2829
use Flow\ETL\Exception\InvalidArgumentException;
2930

@@ -325,6 +326,26 @@ function postgresql_update_options(
325326
);
326327
}
327328

329+
/**
330+
* Execute multiple loaders within a database transaction.
331+
* Each batch of rows will be processed in its own transaction.
332+
* If any loader fails, the entire batch will be rolled back.
333+
*
334+
* @param array<string, mixed>|Connection $connection
335+
* @param DbalLoader ...$loaders - DBAL loaders to execute within the transaction
336+
*
337+
* @throws InvalidArgumentException
338+
*/
339+
#[DocumentationDSL(module: Module::DOCTRINE, type: DSLType::LOADER)]
340+
function to_dbal_transaction(
341+
array|Connection $connection,
342+
DbalLoader ...$loaders,
343+
) : TransactionalDbalLoader {
344+
return \is_array($connection)
345+
? new TransactionalDbalLoader($connection, ...$loaders)
346+
: TransactionalDbalLoader::fromConnection($connection, ...$loaders);
347+
}
348+
328349
#[DocumentationDSL(module: Module::DOCTRINE, type: DSLType::HELPER)]
329350
function pagination_key_asc(string $column, string|int|ParameterType|Type $type = ParameterType::STRING) : Key
330351
{

0 commit comments

Comments
 (0)