Skip to content

Commit 9f4f219

Browse files
authored
Fixed key set pagination when key column was ambiguous (#1613)
* Fixed key set pagination when key column was ambiguous * Fixed query builder in key set pagination for multiple keys scenario * CS Fixes
1 parent 40c7eae commit 9f4f219

File tree

2 files changed

+153
-7
lines changed

2 files changed

+153
-7
lines changed

src/adapter/etl-adapter-doctrine/src/Flow/ETL/Adapter/Doctrine/DbalKeySetExtractor.php

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
*/
2121
final class DbalKeySetExtractor implements Extractor
2222
{
23+
private string $keyAliasSuffix = '_previous';
24+
2325
private ?int $maximum = null;
2426

2527
private int $pageSize = 1000;
@@ -56,6 +58,7 @@ public function extract(FlowContext $context) : \Generator
5658

5759
foreach ($this->keySet->keys as $key) {
5860
$qb->addOrderBy($key->column, $key->order->value);
61+
$qb->addSelect($key->column . ' AS ' . $this->keyAlias($key));
5962
}
6063

6164
if ($lastRow !== null) {
@@ -64,28 +67,30 @@ public function extract(FlowContext $context) : \Generator
6467
$parameterTypes = [];
6568

6669
foreach ($this->keySet->keys as $index => $key) {
67-
if (!\array_key_exists($key->column, $lastRow)) {
70+
$keyAlias = $this->keyAlias($key);
71+
72+
if (!\array_key_exists($keyAlias, $lastRow)) {
6873
throw new RuntimeException(sprintf('Column "%s" not found in last row for keyset pagination', $key->column));
6974
}
7075

71-
$lastValue = $lastRow[$key->column];
76+
$lastValue = $lastRow[$keyAlias];
7277

7378
if ($lastValue === null) {
7479
throw new RuntimeException(sprintf('NULL value found in column "%s" for keyset pagination; key columns must be non-null', $key->column));
7580
}
7681

77-
$paramName = $key->column . '_previous';
78-
$parameters[$paramName] = $lastValue;
79-
$parameterTypes[$paramName] = $key->type;
82+
$parameters[$keyAlias] = $lastValue;
83+
$parameterTypes[$keyAlias] = $key->type;
8084

8185
$subConditions = [];
8286

8387
for ($i = 0; $i < $index; $i++) {
8488
$prevKey = $this->keySet->keys[$i];
85-
$subConditions[] = $qb->expr()->eq($prevKey->column, ':' . $prevKey->column . '_previous');
89+
$subConditions[] = $qb->expr()->eq($prevKey->column, ':' . $this->keyAlias($prevKey));
8690
}
91+
8792
$operator = $key->order->value === 'DESC' ? 'lt' : 'gt';
88-
$subConditions[] = $qb->expr()->{$operator}($key->column, ':' . $paramName);
93+
$subConditions[] = $qb->expr()->{$operator}($key->column, ':' . $keyAlias);
8994

9095
$conditions[] = $qb->expr()->and(...$subConditions);
9196
}
@@ -112,6 +117,14 @@ public function extract(FlowContext $context) : \Generator
112117
$hasRows = true;
113118
$lastRow = $row;
114119

120+
foreach ($this->keySet->keys as $key) {
121+
$keyAlias = $this->keyAlias($key);
122+
123+
if (\array_key_exists($keyAlias, $row)) {
124+
unset($row[$keyAlias]);
125+
}
126+
}
127+
115128
$signal = yield array_to_rows($row, $context->entryFactory(), [], $this->schema);
116129

117130
if ($signal === Extractor\Signal::STOP) {
@@ -131,6 +144,13 @@ public function extract(FlowContext $context) : \Generator
131144
}
132145
}
133146

147+
public function withKeyAliasSuffix(string $keyAliasSuffix) : self
148+
{
149+
$this->keyAliasSuffix = $keyAliasSuffix;
150+
151+
return $this;
152+
}
153+
134154
/**
135155
* Sets the maximum number of rows to fetch.
136156
*
@@ -184,4 +204,9 @@ public function withSchema(Schema $schema) : self
184204

185205
return $this;
186206
}
207+
208+
private function keyAlias(Pagination\Key $key) : string
209+
{
210+
return (string) preg_replace('/[^a-zA-Z0-9_]/', '_', str_replace('.', '_', $key->column . $this->keyAliasSuffix));
211+
}
187212
}

src/adapter/etl-adapter-doctrine/tests/Flow/ETL/Adapter/Doctrine/Tests/Integration/DbalKeySetExtractorTest.php

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,127 @@ public function test_extracting_with_duplicate_key_values_and_tiebreaker() : voi
266266
);
267267
}
268268

269+
public function test_extraction_when_key_is_ambiguous_column() : void
270+
{
271+
$this->pgsqlDatabaseContext->createTable(
272+
to_dbal_schema_table(
273+
schema(
274+
int_schema('id', metadata: DbalMetadata::primaryKey()),
275+
str_schema('name', metadata: DbalMetadata::length(255)),
276+
),
277+
$table = 'flow_key_set_extractor_test_01',
278+
)
279+
);
280+
281+
$this->pgsqlDatabaseContext->createTable(
282+
to_dbal_schema_table(
283+
schema(
284+
int_schema('id', metadata: DbalMetadata::primaryKey()),
285+
int_schema('id_01'),
286+
str_schema('name', metadata: DbalMetadata::length(255)),
287+
),
288+
'flow_key_set_extractor_test_02',
289+
)
290+
);
291+
292+
for ($i = 1; $i <= 25; $i++) {
293+
$this->pgsqlDatabaseContext->insert($table, ['id' => $i, 'name' => 'name_' . $i]);
294+
295+
$this->pgsqlDatabaseContext->insert('flow_key_set_extractor_test_02', ['id' => $i, 'id_01' => $i, 'name' => 'name_' . $i]);
296+
}
297+
298+
$rows = data_frame()
299+
->extract(
300+
from_dbal_key_set_qb(
301+
$this->pgsqlDatabaseContext->connection(),
302+
$this->pgsqlDatabaseContext->connection()->createQueryBuilder()
303+
->from($table)
304+
->select('flow_key_set_extractor_test_01.id as id')
305+
->leftJoin(
306+
'flow_key_set_extractor_test_01',
307+
'flow_key_set_extractor_test_02',
308+
'flow_key_set_extractor_test_02',
309+
'flow_key_set_extractor_test_01.id = flow_key_set_extractor_test_02.id_01'
310+
),
311+
pagination_key_set(pagination_key_desc('flow_key_set_extractor_test_01.id'))
312+
)
313+
->withSchema(schema(int_schema('id')))
314+
->withPageSize(5)
315+
->withMaximum(5)
316+
)
317+
->fetch()
318+
->toArray();
319+
320+
self::assertSame([
321+
['id' => 25],
322+
['id' => 24],
323+
['id' => 23],
324+
['id' => 22],
325+
['id' => 21],
326+
], $rows);
327+
}
328+
329+
public function test_extraction_when_key_is_ambiguous_column_with_custom_key_column_alias_suffix() : void
330+
{
331+
$this->pgsqlDatabaseContext->createTable(
332+
to_dbal_schema_table(
333+
schema(
334+
int_schema('id', metadata: DbalMetadata::primaryKey()),
335+
str_schema('name', metadata: DbalMetadata::length(255)),
336+
),
337+
$table = 'flow_key_set_extractor_test_01',
338+
)
339+
);
340+
341+
$this->pgsqlDatabaseContext->createTable(
342+
to_dbal_schema_table(
343+
schema(
344+
int_schema('id', metadata: DbalMetadata::primaryKey()),
345+
int_schema('id_01'),
346+
str_schema('name', metadata: DbalMetadata::length(255)),
347+
),
348+
'flow_key_set_extractor_test_02',
349+
)
350+
);
351+
352+
for ($i = 1; $i <= 25; $i++) {
353+
$this->pgsqlDatabaseContext->insert($table, ['id' => $i, 'name' => 'name_' . $i]);
354+
355+
$this->pgsqlDatabaseContext->insert('flow_key_set_extractor_test_02', ['id' => $i, 'id_01' => $i, 'name' => 'name_' . $i]);
356+
}
357+
358+
$rows = data_frame()
359+
->extract(
360+
from_dbal_key_set_qb(
361+
$this->pgsqlDatabaseContext->connection(),
362+
$this->pgsqlDatabaseContext->connection()->createQueryBuilder()
363+
->from($table)
364+
->select('flow_key_set_extractor_test_01.id as id')
365+
->leftJoin(
366+
'flow_key_set_extractor_test_01',
367+
'flow_key_set_extractor_test_02',
368+
'flow_key_set_extractor_test_02',
369+
'flow_key_set_extractor_test_01.id = flow_key_set_extractor_test_02.id_01'
370+
),
371+
pagination_key_set(pagination_key_desc('flow_key_set_extractor_test_01.id'))
372+
)
373+
->withKeyAliasSuffix('_something_custom')
374+
->withSchema(schema(int_schema('id')))
375+
->withPageSize(5)
376+
->withMaximum(5)
377+
)
378+
->fetch()
379+
->toArray();
380+
381+
self::assertSame([
382+
['id' => 25],
383+
['id' => 24],
384+
['id' => 23],
385+
['id' => 22],
386+
['id' => 21],
387+
], $rows);
388+
}
389+
269390
public function test_throws_exception_for_empty_key_set() : void
270391
{
271392
$this->pgsqlDatabaseContext->createTable(

0 commit comments

Comments
 (0)