Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@ final class PostgreSqlKeySetExtractor implements Extractor

private ?Schema $schema = null;

/**
* @param array<int, mixed> $parameters
*/
public function __construct(
private readonly Client $client,
private readonly string|SqlQuery $query,
private readonly KeySet $keySet,
private readonly array $parameters = [],
) {
}

Expand All @@ -38,7 +42,7 @@ public function extract(FlowContext $context) : \Generator
while (true) {
$paginatedSql = $this->applyKeysetPagination($sql, $this->pageSize, $cursorValues);

$cursor = $this->client->cursor($paginatedSql, $cursorValues ?? []);
$cursor = $this->client->cursor($paginatedSql, \array_merge($this->parameters, $cursorValues ?? []));

$hasRows = false;
$lastRow = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@ final class PostgreSqlLimitOffsetExtractor implements Extractor

private ?Schema $schema = null;

/**
* @param array<int, mixed> $parameters
*/
public function __construct(
private readonly Client $client,
private readonly string|SqlQuery $query,
private readonly array $parameters = [],
) {
}

Expand Down Expand Up @@ -50,7 +54,7 @@ public function extract(FlowContext $context) : \Generator

$paginatedSql = $this->applyPagination($sql, $this->pageSize, $offset);

$cursor = $this->client->cursor($paginatedSql);
$cursor = $this->client->cursor($paginatedSql, $this->parameters);

foreach ($cursor->iterate() as $row) {
$signal = yield array_to_rows($row, $context->entryFactory(), [], $this->schema);
Expand Down Expand Up @@ -110,6 +114,6 @@ private function applyPagination(string $sql, int $limit, int $offset) : string

private function countTotal(string $sql) : int
{
return $this->client->fetchScalarInt(sql_to_count_query($sql));
return $this->client->fetchScalarInt(sql_to_count_query($sql), $this->parameters);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,60 +21,54 @@
* @param Client $client PostgreSQL client
* @param SqlQuery|string $query SQL query to execute (wrapped in DECLARE CURSOR)
* @param array<int, mixed> $parameters Positional parameters for the query
* @param int $fetchSize Number of rows to fetch per batch (default: 1000)
* @param null|int $maximum Maximum number of rows to extract (null for unlimited)
*/
#[DocumentationDSL(module: Module::POSTGRESQL, type: DSLType::EXTRACTOR)]
function from_pgsql_cursor(
Client $client,
string|SqlQuery $query,
array $parameters = [],
int $fetchSize = 1000,
?int $maximum = null,
) : PostgreSqlCursorExtractor {
$extractor = (new PostgreSqlCursorExtractor($client, $query, $parameters))
->withFetchSize($fetchSize);

if ($maximum !== null) {
$extractor->withMaximum($maximum);
}

return $extractor;
return new PostgreSqlCursorExtractor($client, $query, $parameters);
}

/**
* Create a PostgreSQL extractor using LIMIT/OFFSET pagination.
*
* Suitable for smaller datasets. For large datasets, consider using keyset pagination
* (from_pgsql_key_set) which is more efficient.
*
* @param Client $client PostgreSQL client
* @param SqlQuery|string $query SQL query to execute (must have ORDER BY clause)
* @param array<int, mixed> $parameters Positional parameters for the query
*/
#[DocumentationDSL(module: Module::POSTGRESQL, type: DSLType::EXTRACTOR)]
function from_pgsql_limit_offset(
Client $client,
string|SqlQuery $query,
int $pageSize = 1000,
?int $maximum = null,
array $parameters = [],
) : PostgreSqlLimitOffsetExtractor {
$extractor = (new PostgreSqlLimitOffsetExtractor($client, $query))
->withPageSize($pageSize);

if ($maximum !== null) {
$extractor->withMaximum($maximum);
}

return $extractor;
return new PostgreSqlLimitOffsetExtractor($client, $query, $parameters);
}

/**
* Create a PostgreSQL extractor using keyset (cursor-based) pagination.
*
* More efficient than LIMIT/OFFSET for large datasets - uses indexed WHERE conditions
* instead of skipping rows.
*
* @param Client $client PostgreSQL client
* @param SqlQuery|string $query SQL query to execute (must have ORDER BY matching keyset columns)
* @param KeySet $keySet Columns to use for keyset pagination
* @param array<int, mixed> $parameters Positional parameters for the query
*/
#[DocumentationDSL(module: Module::POSTGRESQL, type: DSLType::EXTRACTOR)]
function from_pgsql_key_set(
Client $client,
string|SqlQuery $query,
KeySet $keySet,
int $pageSize = 1000,
?int $maximum = null,
array $parameters = [],
) : PostgreSqlKeySetExtractor {
$extractor = (new PostgreSqlKeySetExtractor($client, $query, $keySet))
->withPageSize($pageSize);

if ($maximum !== null) {
$extractor->withMaximum($maximum);
}

return $extractor;
return new PostgreSqlKeySetExtractor($client, $query, $keySet, $parameters);
}

#[DocumentationDSL(module: Module::POSTGRESQL, type: DSLType::HELPER)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ public function bench_extract_10k_cursor() : void
foreach (from_pgsql_cursor(
$this->client,
select(star())->from(table(self::TABLE_NAME)),
fetchSize: 1000
)->extract($context) as $rows) {
)->withFetchSize(1000)->extract($context) as $rows) {
}
}

Expand All @@ -70,8 +69,7 @@ public function bench_extract_10k_keyset() : void
$this->client,
select(star())->from(table(self::TABLE_NAME)),
pgsql_pagination_key_set(pgsql_pagination_key_asc('index')),
pageSize: 1000
)->extract($context) as $rows) {
)->withPageSize(1000)->extract($context) as $rows) {
}
}

Expand All @@ -82,8 +80,7 @@ public function bench_extract_10k_limit_offset() : void
foreach (from_pgsql_limit_offset(
$this->client,
select(star())->from(table(self::TABLE_NAME))->orderBy(asc(col('index'))),
pageSize: 1000
)->extract($context) as $rows) {
)->withPageSize(1000)->extract($context) as $rows) {
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ public function test_extracts_all_rows_with_cursor() : void
->read(from_pgsql_cursor(
$this->client,
select(col('id'), col('name'))->from(table($this->tableName))->orderBy(asc(col('id'))),
fetchSize: 5
))
)->withFetchSize(5))
->fetch()
->toArray();

Expand All @@ -64,8 +63,7 @@ public function test_extracts_all_rows_without_order_by() : void
->read(from_pgsql_cursor(
$this->client,
select(col('id'), col('name'))->from(table($this->tableName)),
fetchSize: 5
))
)->withFetchSize(5))
->fetch()
->toArray();

Expand All @@ -78,9 +76,7 @@ public function test_extracts_limited_rows_with_maximum() : void
->read(from_pgsql_cursor(
$this->client,
select(col('id'), col('name'))->from(table($this->tableName))->orderBy(asc(col('id'))),
fetchSize: 5,
maximum: 12
))
)->withFetchSize(5)->withMaximum(12))
->fetch()
->toArray();

Expand All @@ -96,8 +92,7 @@ public function test_extracts_with_custom_cursor_name() : void
->read(from_pgsql_cursor(
$this->client,
select(col('id'), col('name'))->from(table($this->tableName))->orderBy(asc(col('id'))),
fetchSize: 5
)->withCursorName('my_custom_cursor'))
)->withFetchSize(5)->withCursorName('my_custom_cursor'))
->fetch()
->toArray();

Expand All @@ -113,8 +108,7 @@ public function test_extracts_with_custom_fetch_size() : void
->read(from_pgsql_cursor(
$this->client,
select(col('id'), col('name'))->from(table($this->tableName))->orderBy(asc(col('id'))),
fetchSize: 3
))
)->withFetchSize(3))
->fetch()
->toArray();

Expand All @@ -130,9 +124,8 @@ public function test_extracts_with_parameterized_query() : void
->read(from_pgsql_cursor(
$this->client,
'SELECT id, name FROM ' . $this->tableName . ' WHERE id > $1 ORDER BY id',
parameters: [10],
fetchSize: 5
))
[10],
)->withFetchSize(5))
->fetch()
->toArray();

Expand All @@ -148,8 +141,7 @@ public function test_extracts_with_raw_sql() : void
->read(from_pgsql_cursor(
$this->client,
'SELECT id, name FROM ' . $this->tableName . ' ORDER BY id',
fetchSize: 5
))
)->withFetchSize(5))
->fetch()
->toArray();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ public function test_extracts_all_rows_with_keyset_pagination() : void
$this->client,
select(col('id'), col('name'))->from(table($this->tableName)),
pgsql_pagination_key_set(pgsql_pagination_key_asc('id')),
pageSize: 5
))
)->withPageSize(5))
->fetch()
->toArray();

Expand All @@ -66,9 +65,7 @@ public function test_extracts_limited_rows_with_maximum() : void
$this->client,
select(col('id'), col('name'))->from(table($this->tableName)),
pgsql_pagination_key_set(pgsql_pagination_key_asc('id')),
pageSize: 5,
maximum: 12
))
)->withPageSize(5)->withMaximum(12))
->fetch()
->toArray();

Expand All @@ -85,8 +82,7 @@ public function test_extracts_with_descending_order() : void
$this->client,
select(col('id'), col('name'))->from(table($this->tableName)),
pgsql_pagination_key_set(pgsql_pagination_key_desc('id')),
pageSize: 5
))
)->withPageSize(5))
->fetch()
->toArray();

Expand All @@ -96,15 +92,32 @@ public function test_extracts_with_descending_order() : void
self::assertSame(\range(25, 1), \array_column($rows, 'id'));
}

public function test_extracts_with_multiple_positional_parameters() : void
{
$rows = df()
->read(from_pgsql_key_set(
$this->client,
'SELECT id, name FROM ' . $this->tableName . ' WHERE id >= $1 AND id <= $2',
pgsql_pagination_key_set(pgsql_pagination_key_asc('id')),
[5, 15],
)->withPageSize(3))
->fetch()
->toArray();

self::assertCount(11, $rows);
self::assertSame(5, $rows[0]['id']);
self::assertSame(15, $rows[10]['id']);
self::assertSame(\range(5, 15), \array_column($rows, 'id'));
}

public function test_extracts_with_raw_sql() : void
{
$rows = df()
->read(from_pgsql_key_set(
$this->client,
'SELECT id, name FROM ' . $this->tableName,
pgsql_pagination_key_set(pgsql_pagination_key_asc('id')),
pageSize: 5
))
)->withPageSize(5))
->fetch()
->toArray();

Expand All @@ -114,6 +127,42 @@ public function test_extracts_with_raw_sql() : void
self::assertSame(\range(1, 25), \array_column($rows, 'id'));
}

public function test_extracts_with_single_positional_parameter() : void
{
$rows = df()
->read(from_pgsql_key_set(
$this->client,
'SELECT id, name FROM ' . $this->tableName . ' WHERE id > $1',
pgsql_pagination_key_set(pgsql_pagination_key_asc('id')),
[10],
)->withPageSize(5))
->fetch()
->toArray();

self::assertCount(15, $rows);
self::assertSame(11, $rows[0]['id']);
self::assertSame(25, $rows[14]['id']);
self::assertSame(\range(11, 25), \array_column($rows, 'id'));
}

public function test_extracts_with_three_positional_parameters() : void
{
$rows = df()
->read(from_pgsql_key_set(
$this->client,
'SELECT id, name FROM ' . $this->tableName . ' WHERE id >= $1 AND id <= $2 AND name LIKE $3',
pgsql_pagination_key_set(pgsql_pagination_key_asc('id')),
[5, 15, 'User_%'],
)->withPageSize(3))
->fetch()
->toArray();

self::assertCount(11, $rows);
self::assertSame(5, $rows[0]['id']);
self::assertSame(15, $rows[10]['id']);
self::assertSame(\range(5, 15), \array_column($rows, 'id'));
}

public function test_returns_empty_for_empty_table() : void
{
$this->client->execute(delete()->from($this->tableName));
Expand All @@ -123,8 +172,7 @@ public function test_returns_empty_for_empty_table() : void
$this->client,
select(star())->from(table($this->tableName)),
pgsql_pagination_key_set(pgsql_pagination_key_asc('id')),
pageSize: 10
))
)->withPageSize(10))
->fetch()
->toArray();

Expand Down
Loading
Loading