Skip to content

Commit 7e190b3

Browse files
authored
Merge pull request #25 from keboola/COM-1534-ondra
2 parents 3b1851d + 374e2ce commit 7e190b3

File tree

14 files changed

+581
-84
lines changed

14 files changed

+581
-84
lines changed

CONFIG.md

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Configuring in Keboola Connection UI
22

3-
## Sample
3+
## Sample scan mode
44

55

66
```json
@@ -23,6 +23,7 @@
2323
"title",
2424
"year"
2525
],
26+
"mode": "scan",
2627
"dateFilter": {
2728
"field": "year",
2829
"format": "Y",
@@ -39,6 +40,49 @@
3940
}
4041
```
4142

43+
44+
## Sample query mode
45+
46+
47+
```json
48+
{
49+
"db": {
50+
"endpoint": "endpoint",
51+
"accessKeyId": "access key id",
52+
"#secretAccessKey": "secret access key",
53+
"regionName": "eu-central-1"
54+
},
55+
"exports": [
56+
{
57+
"id": 1,
58+
"name": "my-movies",
59+
"table": "Movies",
60+
"index": "Movies_SomeIndex",
61+
"enabled": true,
62+
"incremental": true,
63+
"primaryKey": [
64+
"title",
65+
"year"
66+
],
67+
"mode": "query",
68+
"keyConditionExpression": "#yr = :a",
69+
"expressionAttributeNames": {
70+
"#yr": "year"
71+
},
72+
"expressionAttributeValues": {
73+
":a": "2013"
74+
},
75+
"limit": 100,
76+
"mapping": {
77+
"title": "title",
78+
"year": "year",
79+
"info.rating": "rating"
80+
}
81+
}
82+
]
83+
}
84+
```
85+
4286
## Description of `parameters`
4387

4488
- `db`: DynamoDB instance connection options
@@ -55,7 +99,11 @@
5599
- `enabled` (optional, default: `true`): if export is enabled or not (there has to be at least one enabled export)
56100
- `incremental`: if load of tables to storage will be incremental
57101
- `primaryKey`: primary key to set on imported table, defined as array
58-
- `dateFilter` (optional): how to filter scanned documents
102+
- `mode`: (optional): enum(scan|query) reading mode from dynamoDb - default is scan
103+
- `keyConditionExpression`: (required): provide a specific value for the partition key
104+
- `expressionAttributeValues`: (required): values that can be substituted in an expression
105+
- `expressionAttributeNames`: (optional): substitution tokens for attribute names in an expression
106+
- `dateFilter` (optional): how to filter scanned documents (only for scan mode)
59107
- `field`: field name in document by which you want to filter
60108
- `format`: date format (e.g. `Y-m-d` for date or `Y` for year)
61109
- `value`: date string from which date value will be created (e.g. `-2 days`)

README.md

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,26 +26,21 @@ Application is prepared for run in container, you can start development same way
2626
1. Clone this repository: `git clone git@github.com:keboola/dynamodb-extractor.git`
2727
2. Change directory: `cd dynamodb-extractor`
2828
3. Build services: `docker-compose build`
29-
4. Run tests `docker-compose run --rm app-tests` (runs `./tests.sh` script)
29+
4. Run tests `docker-compose run --rm app composer ci`
3030

3131
After seeing all tests green, continue:
3232

33-
1. Run service: `docker-compose run --rm app` (starts container with `bash`)
33+
1. Run service: `docker-compose run --rm app-tests` (starts container with `bash`)
3434
2. Create tables/indexes and load sample data: `php init.php`
3535
3. Write tests and code
36-
4. Run tests: `./tests.sh`
36+
4. Run tests: `composer tests`
3737

3838
To simulate real run:
3939

4040
1. Create data dir: `mkdir -p data`
4141
2. Follow configuration sample and create `config.json` file and place it to your data directory (`data/config.json`)
4242
3. Simulate real run (with entrypoint command): `php ./src/app.php run ./data`
4343

44-
### Tests
45-
46-
- all in one: `./tests.sh`
47-
- or separately, just check `tests.sh` file contents
48-
4944
## License
5045

5146
MIT licensed, see [LICENSE](./LICENSE) file.

src/Keboola/DynamoDbExtractor/ConfigDefinition.php

Lines changed: 46 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,22 @@
66

77
use Symfony\Component\Config\Definition\Builder\TreeBuilder;
88
use Symfony\Component\Config\Definition\ConfigurationInterface;
9+
use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException;
910

1011
class ConfigDefinition implements ConfigurationInterface
1112
{
13+
public const MODE_SCAN = 'scan';
14+
15+
public const MODE_QUERY = 'query';
16+
17+
private const QUERY_INVALID_NODES = ['dateFilter'];
18+
19+
private const SCAN_INVALID_NODES = [
20+
'keyConditionExpression',
21+
'expressionAttributeNames',
22+
'expressionAttributeValues',
23+
];
24+
1225
public function getConfigTreeBuilder(): TreeBuilder
1326
{
1427
$treeBuilder = new TreeBuilder('parameters');
@@ -38,31 +51,45 @@ public function getConfigTreeBuilder(): TreeBuilder
3851
->end()
3952
->arrayNode('exports')
4053
->prototype('array')
54+
->validate()->always(function ($exportItem) {
55+
if ($exportItem['mode'] === self::MODE_QUERY) {
56+
foreach (self::QUERY_INVALID_NODES as $invalidNodes) {
57+
if (isset($exportItem[$invalidNodes])) {
58+
throw new InvalidConfigurationException(
59+
sprintf('Node "%s" is not allowed for query export.', $invalidNodes)
60+
);
61+
}
62+
}
63+
} else {
64+
foreach (self::SCAN_INVALID_NODES as $invalidNodes) {
65+
if (isset($exportItem[$invalidNodes])) {
66+
throw new InvalidConfigurationException(
67+
sprintf('Node "%s" is not allowed for scan export.', $invalidNodes)
68+
);
69+
}
70+
}
71+
}
72+
return $exportItem;
73+
})->end()
4174
->children()
42-
->integerNode('id')
43-
->isRequired()
44-
->end()
75+
->integerNode('id')->isRequired()->end()
4576
->scalarNode('name')
4677
->isRequired()
4778
->cannotBeEmpty()
4879
->end()
49-
->scalarNode('table')
50-
->isRequired()
51-
->cannotBeEmpty()
52-
->end()
53-
->scalarNode('index')
54-
->cannotBeEmpty()
55-
->end()
56-
->integerNode('limit')
57-
->end()
58-
->variableNode('dateFilter')
59-
->end()
60-
->booleanNode('enabled')
61-
->defaultValue(true)
62-
->end()
63-
->booleanNode('incremental')
64-
->isRequired()
80+
->enumNode('mode')
81+
->values([self::MODE_SCAN, self::MODE_QUERY])
82+
->defaultValue(self::MODE_SCAN)
6583
->end()
84+
->scalarNode('table')->isRequired()->cannotBeEmpty()->end()
85+
->scalarNode('index')->cannotBeEmpty()->end()
86+
->integerNode('limit')->end()
87+
->variableNode('dateFilter')->end()
88+
->scalarNode('keyConditionExpression')->end()
89+
->variableNode('expressionAttributeValues')->end()
90+
->variableNode('expressionAttributeNames')->end()
91+
->booleanNode('enabled')->defaultValue(true)->end()
92+
->booleanNode('incremental')->isRequired()->end()
6693
->arrayNode('primaryKey')
6794
->scalarPrototype()
6895
->end()

src/Keboola/DynamoDbExtractor/Exporter.php

Lines changed: 20 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@
66

77
use Aws\DynamoDb\DynamoDbClient;
88
use Aws\DynamoDb\Exception\DynamoDbException;
9-
use Aws\DynamoDb\Marshaler;
9+
use Keboola\DynamoDbExtractor\ReadingAdapter\QueryReadingAdapter;
10+
use Keboola\DynamoDbExtractor\ReadingAdapter\ScanReadingAdapter;
1011
use Nette\Utils\Strings;
1112
use Symfony\Component\Console\Output\OutputInterface;
1213
use Symfony\Component\Filesystem\Filesystem;
@@ -43,42 +44,32 @@ public function __construct(
4344
*/
4445
public function export(): string
4546
{
46-
$marshaler = new Marshaler();
47-
4847
$params = [
4948
'TableName' => $this->exportOptions['table'],
5049
];
5150

52-
if (isset($this->exportOptions['index'])) {
53-
$params['IndexName'] = $this->exportOptions['index'];
54-
}
55-
56-
if (isset($this->exportOptions['dateFilter'])) {
57-
$paramsFromDateFilter = $this->createParamsFromDateFilter($this->exportOptions['dateFilter']);
58-
$this->consoleOutput->writeln((string) json_encode($paramsFromDateFilter));
59-
$params = array_merge($params, $paramsFromDateFilter);
51+
switch ($this->exportOptions['mode']) {
52+
case ConfigDefinition::MODE_QUERY:
53+
$readingAdapter = new QueryReadingAdapter(
54+
$this->exportOptions,
55+
$this->dynamoDbClient,
56+
$this->consoleOutput,
57+
$this->filename
58+
);
59+
break;
60+
default:
61+
$readingAdapter = new ScanReadingAdapter(
62+
$this->exportOptions,
63+
$this->dynamoDbClient,
64+
$this->consoleOutput,
65+
$this->filename
66+
);
6067
}
61-
62-
$scanLimit = new ScanLimit(1000, $this->exportOptions['limit'] ?? null);
63-
6468
try {
65-
do {
66-
if (isset($response, $response['LastEvaluatedKey'])) {
67-
$params['ExclusiveStartKey'] = $response['LastEvaluatedKey'];
68-
}
69-
$params['Limit'] = $scanLimit->getBatchSize();
70-
$response = $this->dynamoDbClient->scan($params)->toArray();
71-
$scanLimit->decreaseLimit($response['Count']);
72-
73-
/** @var array $item */
74-
foreach ((array) $response['Items'] as $item) {
75-
$json = json_encode($marshaler->unmarshalItem($item));
76-
FileHelper::appendContentToFile($this->filename, $json . "\n");
77-
}
78-
} while ($scanLimit->shouldContinue() && isset($response['LastEvaluatedKey']));
69+
$readingAdapter->read($params);
7970
} catch (DynamoDbException $e) {
8071
if ($e->getStatusCode() !== null && substr((string) $e->getStatusCode(), 0, 1) === '4') {
81-
throw new UserException((string) $e->getAwsErrorCode());
72+
throw new UserException((string) $e->getAwsErrorMessage());
8273
} else {
8374
throw $e;
8475
}
@@ -102,22 +93,4 @@ public function cleanup(): void
10293
{
10394
$this->filesystem->remove($this->filename);
10495
}
105-
106-
/**
107-
* Creates filtering params from date filter
108-
*/
109-
private function createParamsFromDateFilter(array $dateFilter): array
110-
{
111-
return [
112-
'FilterExpression' => '#field >= :value',
113-
'ExpressionAttributeNames' => [
114-
'#field' => $dateFilter['field'],
115-
],
116-
'ExpressionAttributeValues' => [
117-
':value' => [
118-
'S' => date($dateFilter['format'], strtotime($dateFilter['value'])),
119-
],
120-
],
121-
];
122-
}
12396
}

src/Keboola/DynamoDbExtractor/Parser.php

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
use Keboola\CsvMap\Mapper;
99
use Keboola\CsvTable\Table;
1010
use Nette\Utils\Strings;
11-
use SplFileInfo;
1211
use Symfony\Component\Console\Output\OutputInterface;
1312
use Symfony\Component\Filesystem\Filesystem;
1413

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Keboola\DynamoDbExtractor\ReadingAdapter;
6+
7+
use Aws\DynamoDb\DynamoDbClient;
8+
use Aws\DynamoDb\Marshaler;
9+
use Keboola\DynamoDbExtractor\FileHelper;
10+
use Symfony\Component\Console\Output\OutputInterface;
11+
12+
abstract class AbstractReadingAdapter
13+
{
14+
protected array $exportOptions;
15+
16+
protected DynamoDbClient $dynamoDbClient;
17+
18+
protected OutputInterface $consoleOutput;
19+
20+
protected string $filename;
21+
22+
public function __construct(
23+
array $exportOptions,
24+
DynamoDbClient $dynamoDbClient,
25+
OutputInterface $consoleOutput,
26+
string $filename
27+
) {
28+
$this->exportOptions = $exportOptions;
29+
$this->dynamoDbClient = $dynamoDbClient;
30+
$this->consoleOutput = $consoleOutput;
31+
$this->filename = $filename;
32+
}
33+
34+
abstract public function read(array $params): void;
35+
36+
protected function saveResponseItems(Marshaler $marshaler, array $items): void
37+
{
38+
/** @var array $item */
39+
foreach ($items as $item) {
40+
$json = json_encode($marshaler->unmarshalItem($item));
41+
FileHelper::appendContentToFile($this->filename, $json . "\n");
42+
}
43+
}
44+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Keboola\DynamoDbExtractor\ReadingAdapter;
6+
7+
use Aws\DynamoDb\Marshaler;
8+
use Keboola\DynamoDbExtractor\ReadingLimit;
9+
10+
class QueryReadingAdapter extends AbstractReadingAdapter
11+
{
12+
public function read(array $params): void
13+
{
14+
$marshaler = new Marshaler();
15+
16+
if (!empty($this->exportOptions['keyConditionExpression'])) {
17+
$params['KeyConditionExpression'] = $this->exportOptions['keyConditionExpression'];
18+
}
19+
if (!empty($this->exportOptions['expressionAttributeValues'])) {
20+
$params['ExpressionAttributeValues'] = $marshaler->marshalJson(
21+
(string) json_encode($this->exportOptions['expressionAttributeValues'])
22+
);
23+
}
24+
if (!empty($this->exportOptions['expressionAttributeNames'])) {
25+
$params['ExpressionAttributeNames'] = $this->exportOptions['expressionAttributeNames'];
26+
}
27+
28+
$scanLimit = new ReadingLimit(1000, $this->exportOptions['limit'] ?? null);
29+
30+
do {
31+
if (isset($response, $response['LastEvaluatedKey'])) {
32+
$params['ExclusiveStartKey'] = $response['LastEvaluatedKey'];
33+
}
34+
$params['Limit'] = $scanLimit->getBatchSize();
35+
$response = $this->dynamoDbClient->query($params)->toArray();
36+
$scanLimit->decreaseLimit($response['Count']);
37+
38+
$this->saveResponseItems($marshaler, (array) $response['Items']);
39+
} while ($scanLimit->shouldContinue() && isset($response['LastEvaluatedKey']));
40+
}
41+
}

0 commit comments

Comments
 (0)