Skip to content

Commit bd9619e

Browse files
committed
reading limit for query adapter
1 parent 24fd441 commit bd9619e

File tree

6 files changed

+22
-11
lines changed

6 files changed

+22
-11
lines changed

src/Keboola/DynamoDbExtractor/Exporter.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public function export(): string
6969
$readingAdapter->read($params);
7070
} catch (DynamoDbException $e) {
7171
if ($e->getStatusCode() !== null && substr((string) $e->getStatusCode(), 0, 1) === '4') {
72-
throw new UserException((string) $e->getAwsErrorCode());
72+
throw new UserException((string) $e->getAwsErrorMessage());
7373
} else {
7474
throw $e;
7575
}

src/Keboola/DynamoDbExtractor/ReadingAdapter/QueryReadingAdapter.php

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
namespace Keboola\DynamoDbExtractor\ReadingAdapter;
66

77
use Aws\DynamoDb\Marshaler;
8+
use Keboola\DynamoDbExtractor\ReadingLimit;
89

910
class QueryReadingAdapter extends AbstractReadingAdapter
1011
{
@@ -19,8 +20,18 @@ public function read(array $params): void
1920
if (!empty($this->exportOptions['expressionAttributeNames'])) {
2021
$params['ExpressionAttributeNames'] = $this->exportOptions['expressionAttributeNames'];
2122
}
22-
$response = $this->dynamoDbClient->query($params);
2323

24-
$this->saveResponseItems($marshaler, (array) $response['Items']);
24+
$scanLimit = new ReadingLimit(1000, $this->exportOptions['limit'] ?? null);
25+
26+
do {
27+
if (isset($response, $response['LastEvaluatedKey'])) {
28+
$params['ExclusiveStartKey'] = $response['LastEvaluatedKey'];
29+
}
30+
$params['Limit'] = $scanLimit->getBatchSize();
31+
$response = $this->dynamoDbClient->query($params)->toArray();
32+
$scanLimit->decreaseLimit($response['Count']);
33+
34+
$this->saveResponseItems($marshaler, (array) $response['Items']);
35+
} while ($scanLimit->shouldContinue() && isset($response['LastEvaluatedKey']));
2536
}
2637
}

src/Keboola/DynamoDbExtractor/ReadingAdapter/ScanReadingAdapter.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
namespace Keboola\DynamoDbExtractor\ReadingAdapter;
66

77
use Aws\DynamoDb\Marshaler;
8-
use Keboola\DynamoDbExtractor\ScanLimit;
8+
use Keboola\DynamoDbExtractor\ReadingLimit;
99

1010
class ScanReadingAdapter extends AbstractReadingAdapter
1111
{
@@ -20,7 +20,7 @@ public function read(array $params): void
2020
$params = array_merge($params, $paramsFromDateFilter);
2121
}
2222

23-
$scanLimit = new ScanLimit(1000, $this->exportOptions['limit'] ?? null);
23+
$scanLimit = new ReadingLimit(1000, $this->exportOptions['limit'] ?? null);
2424

2525
$marshaler = new Marshaler();
2626

src/Keboola/DynamoDbExtractor/ScanLimit.php renamed to src/Keboola/DynamoDbExtractor/ReadingLimit.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
namespace Keboola\DynamoDbExtractor;
66

7-
class ScanLimit
7+
class ReadingLimit
88
{
99
/** The maximum number of items to evaluate (DynamoDB's "Limit" parameter) */
1010
private int $batchSize;

tests/Keboola/DynamoDbExtractor/RunFullExportUnknownTableTest.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public function testRun(): void
5959

6060
$this->assertSame(1, $exitCode);
6161
$this->assertStringContainsString(
62-
'ResourceNotFoundException',
62+
'Cannot do operations on a non-existent table',
6363
$commandTester->getDisplay()
6464
);
6565
}

tests/Keboola/DynamoDbExtractor/ScanLimitTest.php

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ class ScanLimitTest extends TestCase
1010
{
1111
public function testLimitNull(): void
1212
{
13-
$scanLimit = new ScanLimit(100);
13+
$scanLimit = new ReadingLimit(100);
1414

1515
$this->assertEquals(100, $scanLimit->getBatchSize());
1616
$this->assertTrue($scanLimit->shouldContinue());
@@ -23,7 +23,7 @@ public function testLimitNull(): void
2323

2424
public function testLimitEqualsBatchSize(): void
2525
{
26-
$scanLimit = new ScanLimit(100, 100);
26+
$scanLimit = new ReadingLimit(100, 100);
2727
$scanLimit->decreaseLimit(100);
2828

2929
$this->assertEquals(0, $scanLimit->getBatchSize());
@@ -32,7 +32,7 @@ public function testLimitEqualsBatchSize(): void
3232

3333
public function testLimitLessThanBatchSize(): void
3434
{
35-
$scanLimit = new ScanLimit(100, 10);
35+
$scanLimit = new ReadingLimit(100, 10);
3636

3737
$this->assertEquals(10, $scanLimit->getBatchSize());
3838
$this->assertTrue($scanLimit->shouldContinue());
@@ -45,7 +45,7 @@ public function testLimitLessThanBatchSize(): void
4545

4646
public function testLimitGreaterThanBatchSize(): void
4747
{
48-
$scanLimit = new ScanLimit(100, 223);
48+
$scanLimit = new ReadingLimit(100, 223);
4949
$scanLimit->decreaseLimit(100);
5050

5151
$this->assertEquals(100, $scanLimit->getBatchSize());

0 commit comments

Comments
 (0)