Skip to content

Commit 7e1c755

Browse files
authored
refactor: optimized pgsql cursor extractor (#2136)
- added: normalize() and fromArray() methods to PostgreSQL Explain Plan classes (Cost, Timing, Buffers, PlanNode, Plan, ExplainConfig) for serialization/deserialization support - added: PHPStan type aliases (PlanNodeShape, TimingShape, BuffersShape) for improved type documentation - changed: Optimized PostgreSqlCursorExtractor to break cursor loop early when fetched rows are less than fetch size, avoiding unnecessary database round-trips
1 parent d31df46 commit 7e1c755

File tree

14 files changed

+1283
-5
lines changed

14 files changed

+1283
-5
lines changed

src/adapter/etl-adapter-postgresql/src/Flow/ETL/Adapter/PostgreSql/PostgreSqlCursorExtractor.php

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,15 @@ public function extract(FlowContext $context) : \Generator
6161

6262
while (true) {
6363
$cursor = $this->client->cursor(fetch($cursorName)->forward($this->fetchSize));
64-
$hasRows = false;
64+
$rowCount = $cursor->count();
65+
66+
if ($rowCount === 0) {
67+
$cursor->free();
68+
69+
break;
70+
}
6571

6672
foreach ($cursor->iterate() as $row) {
67-
$hasRows = true;
6873
$signal = yield array_to_rows($row, $context->entryFactory(), [], $this->schema);
6974

7075
if ($signal === Signal::STOP) {
@@ -84,7 +89,7 @@ public function extract(FlowContext $context) : \Generator
8489

8590
$cursor->free();
8691

87-
if (!$hasRows) {
92+
if ($rowCount < $this->fetchSize) {
8893
break;
8994
}
9095
}

src/adapter/etl-adapter-postgresql/tests/Flow/ETL/Adapter/PostgreSql/Tests/Unit/PostgreSqlCursorExtractorTest.php

Lines changed: 184 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,165 @@
55
namespace Flow\ETL\Adapter\PostgreSql\Tests\Unit;
66

77
use Flow\ETL\Adapter\PostgreSql\PostgreSqlCursorExtractor;
8+
use Flow\ETL\{Config, FlowContext, Schema};
89
use Flow\ETL\Exception\InvalidArgumentException;
9-
use Flow\ETL\Schema;
1010
use Flow\ETL\Tests\FlowTestCase;
11-
use Flow\PostgreSql\Client\Client;
11+
use Flow\PostgreSql\Client\{Client, Cursor};
1212
use PHPUnit\Framework\MockObject\MockObject;
1313

1414
final class PostgreSqlCursorExtractorTest extends FlowTestCase
1515
{
16+
public function test_cursor_loop_breaks_immediately_when_empty_result() : void
17+
{
18+
$client = $this->createClientMock();
19+
$cursor = $this->createCursorMock(rows: [], count: 0);
20+
21+
$client->expects(self::once())
22+
->method('getTransactionNestingLevel')
23+
->willReturn(1);
24+
25+
$client->expects(self::exactly(2))
26+
->method('execute');
27+
28+
$client->expects(self::once())
29+
->method('cursor')
30+
->willReturn($cursor);
31+
32+
$extractor = new PostgreSqlCursorExtractor($client, 'SELECT * FROM users');
33+
$extractor = $extractor->withFetchSize(10);
34+
35+
$rows = [];
36+
37+
foreach ($extractor->extract($this->createFlowContext()) as $rowsData) {
38+
$rows[] = $rowsData;
39+
}
40+
41+
self::assertSame([], $rows);
42+
}
43+
44+
public function test_cursor_loop_breaks_when_rows_less_than_fetch_size() : void
45+
{
46+
$client = $this->createClientMock();
47+
48+
$cursor = $this->createCursorMock(
49+
rows: [
50+
['id' => 1, 'name' => 'User 1'],
51+
['id' => 2, 'name' => 'User 2'],
52+
['id' => 3, 'name' => 'User 3'],
53+
],
54+
count: 3
55+
);
56+
57+
$client->expects(self::once())
58+
->method('getTransactionNestingLevel')
59+
->willReturn(1);
60+
61+
$client->expects(self::exactly(2))
62+
->method('execute');
63+
64+
$client->expects(self::once())
65+
->method('cursor')
66+
->willReturn($cursor);
67+
68+
$extractor = new PostgreSqlCursorExtractor($client, 'SELECT * FROM users');
69+
$extractor = $extractor->withFetchSize(10);
70+
71+
$rows = [];
72+
73+
foreach ($extractor->extract($this->createFlowContext()) as $rowsData) {
74+
$rows = [...$rows, ...$rowsData->toArray()];
75+
}
76+
77+
self::assertCount(3, $rows);
78+
}
79+
80+
public function test_cursor_loop_fetches_multiple_batches_when_needed() : void
81+
{
82+
$client = $this->createClientMock();
83+
84+
$cursor1 = $this->createCursorMock(
85+
rows: [
86+
['id' => 1, 'name' => 'User 1'],
87+
['id' => 2, 'name' => 'User 2'],
88+
],
89+
count: 2
90+
);
91+
92+
$cursor2 = $this->createCursorMock(
93+
rows: [
94+
['id' => 3, 'name' => 'User 3'],
95+
],
96+
count: 1
97+
);
98+
99+
$client->expects(self::once())
100+
->method('getTransactionNestingLevel')
101+
->willReturn(1);
102+
103+
$client->expects(self::exactly(2))
104+
->method('execute');
105+
106+
$client->expects(self::exactly(2))
107+
->method('cursor')
108+
->willReturnOnConsecutiveCalls($cursor1, $cursor2);
109+
110+
$extractor = new PostgreSqlCursorExtractor($client, 'SELECT * FROM users');
111+
$extractor = $extractor->withFetchSize(2);
112+
113+
$rows = [];
114+
115+
foreach ($extractor->extract($this->createFlowContext()) as $rowsData) {
116+
$rows = [...$rows, ...$rowsData->toArray()];
117+
}
118+
119+
self::assertCount(3, $rows);
120+
}
121+
122+
public function test_cursor_loop_with_exact_fetch_size_multiple_does_extra_fetch() : void
123+
{
124+
$client = $this->createClientMock();
125+
126+
$cursor1 = $this->createCursorMock(
127+
rows: [
128+
['id' => 1, 'name' => 'User 1'],
129+
['id' => 2, 'name' => 'User 2'],
130+
],
131+
count: 2
132+
);
133+
134+
$cursor2 = $this->createCursorMock(
135+
rows: [
136+
['id' => 3, 'name' => 'User 3'],
137+
['id' => 4, 'name' => 'User 4'],
138+
],
139+
count: 2
140+
);
141+
142+
$cursor3 = $this->createCursorMock(rows: [], count: 0);
143+
144+
$client->expects(self::once())
145+
->method('getTransactionNestingLevel')
146+
->willReturn(1);
147+
148+
$client->expects(self::exactly(2))
149+
->method('execute');
150+
151+
$client->expects(self::exactly(3))
152+
->method('cursor')
153+
->willReturnOnConsecutiveCalls($cursor1, $cursor2, $cursor3);
154+
155+
$extractor = new PostgreSqlCursorExtractor($client, 'SELECT * FROM users');
156+
$extractor = $extractor->withFetchSize(2);
157+
158+
$rows = [];
159+
160+
foreach ($extractor->extract($this->createFlowContext()) as $rowsData) {
161+
$rows = [...$rows, ...$rowsData->toArray()];
162+
}
163+
164+
self::assertCount(4, $rows);
165+
}
166+
16167
public function test_with_fetch_size_returns_self() : void
17168
{
18169
$client = $this->createClientMock();
@@ -95,4 +246,35 @@ private function createClientMock() : Client
95246
{
96247
return $this->createMock(Client::class);
97248
}
249+
250+
/**
251+
* @param array<array<string, mixed>> $rows
252+
*
253+
* @return Cursor&MockObject
254+
*/
255+
private function createCursorMock(array $rows, int $count) : Cursor
256+
{
257+
$cursor = $this->createMock(Cursor::class);
258+
259+
$cursor->expects(self::once())
260+
->method('count')
261+
->willReturn($count);
262+
263+
$cursor->method('iterate')
264+
->willReturnCallback(function () use ($rows) : \Generator {
265+
foreach ($rows as $row) {
266+
yield $row;
267+
}
268+
});
269+
270+
$cursor->expects(self::once())
271+
->method('free');
272+
273+
return $cursor;
274+
}
275+
276+
private function createFlowContext() : FlowContext
277+
{
278+
return new FlowContext(Config::default());
279+
}
98280
}

src/lib/postgresql/src/Flow/PostgreSql/AST/Transformers/ExplainConfig.php

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,66 @@ public static function forEstimate() : self
5656
);
5757
}
5858

59+
/**
60+
* @param array{
61+
* analyze: bool,
62+
* verbose: bool,
63+
* costs: bool,
64+
* buffers: bool,
65+
* timing: bool,
66+
* summary: bool,
67+
* memory: bool,
68+
* settings: bool,
69+
* wal: bool,
70+
* format: string
71+
* } $data
72+
*/
73+
public static function fromArray(array $data) : self
74+
{
75+
return new self(
76+
analyze: $data['analyze'],
77+
verbose: $data['verbose'],
78+
costs: $data['costs'],
79+
buffers: $data['buffers'],
80+
timing: $data['timing'],
81+
summary: $data['summary'],
82+
memory: $data['memory'],
83+
settings: $data['settings'],
84+
wal: $data['wal'],
85+
format: ExplainFormat::from($data['format']),
86+
);
87+
}
88+
89+
/**
90+
* @return array{
91+
* analyze: bool,
92+
* verbose: bool,
93+
* costs: bool,
94+
* buffers: bool,
95+
* timing: bool,
96+
* summary: bool,
97+
* memory: bool,
98+
* settings: bool,
99+
* wal: bool,
100+
* format: string
101+
* }
102+
*/
103+
public function normalize() : array
104+
{
105+
return [
106+
'analyze' => $this->analyze,
107+
'verbose' => $this->verbose,
108+
'costs' => $this->costs,
109+
'buffers' => $this->buffers,
110+
'timing' => $this->timing,
111+
'summary' => $this->summary,
112+
'memory' => $this->memory,
113+
'settings' => $this->settings,
114+
'wal' => $this->wal,
115+
'format' => $this->format->value,
116+
];
117+
}
118+
59119
public function withAnalyze() : self
60120
{
61121
return new self(

src/lib/postgresql/src/Flow/PostgreSql/Explain/Plan/Buffers.php

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44

55
namespace Flow\PostgreSql\Explain\Plan;
66

7+
/**
8+
* @phpstan-type BuffersShape = array{shared_hit: int, shared_read: int, shared_dirtied: int, shared_written: int, local_hit: int, local_read: int, local_dirtied: int, local_written: int, temp_read: int, temp_written: int}
9+
*/
710
final readonly class Buffers
811
{
912
public function __construct(
@@ -20,6 +23,25 @@ public function __construct(
2023
) {
2124
}
2225

26+
/**
27+
* @param BuffersShape $data
28+
*/
29+
public static function fromArray(array $data) : self
30+
{
31+
return new self(
32+
sharedHit: $data['shared_hit'],
33+
sharedRead: $data['shared_read'],
34+
sharedDirtied: $data['shared_dirtied'],
35+
sharedWritten: $data['shared_written'],
36+
localHit: $data['local_hit'],
37+
localRead: $data['local_read'],
38+
localDirtied: $data['local_dirtied'],
39+
localWritten: $data['local_written'],
40+
tempRead: $data['temp_read'],
41+
tempWritten: $data['temp_written'],
42+
);
43+
}
44+
2345
public function hasDiskSpill() : bool
2446
{
2547
return $this->tempRead > 0 || $this->tempWritten > 0;
@@ -52,6 +74,25 @@ public function localWritten() : int
5274
return $this->localWritten;
5375
}
5476

77+
/**
78+
* @return BuffersShape
79+
*/
80+
public function normalize() : array
81+
{
82+
return [
83+
'shared_hit' => $this->sharedHit,
84+
'shared_read' => $this->sharedRead,
85+
'shared_dirtied' => $this->sharedDirtied,
86+
'shared_written' => $this->sharedWritten,
87+
'local_hit' => $this->localHit,
88+
'local_read' => $this->localRead,
89+
'local_dirtied' => $this->localDirtied,
90+
'local_written' => $this->localWritten,
91+
'temp_read' => $this->tempRead,
92+
'temp_written' => $this->tempWritten,
93+
];
94+
}
95+
5596
public function sharedDirtied() : int
5697
{
5798
return $this->sharedDirtied;

src/lib/postgresql/src/Flow/PostgreSql/Explain/Plan/Cost.php

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,33 @@ public function __construct(
1212
) {
1313
}
1414

15+
/**
16+
* @param array{startup_cost: float, total_cost: float} $data
17+
*/
18+
public static function fromArray(array $data) : self
19+
{
20+
return new self(
21+
startupCost: $data['startup_cost'],
22+
totalCost: $data['total_cost'],
23+
);
24+
}
25+
1526
public function incrementalCost() : float
1627
{
1728
return $this->totalCost - $this->startupCost;
1829
}
1930

31+
/**
32+
* @return array{startup_cost: float, total_cost: float}
33+
*/
34+
public function normalize() : array
35+
{
36+
return [
37+
'startup_cost' => $this->startupCost,
38+
'total_cost' => $this->totalCost,
39+
];
40+
}
41+
2042
public function startupCost() : float
2143
{
2244
return $this->startupCost;

0 commit comments

Comments
 (0)