Skip to content

Commit 7aadb64

Browse files
committed
v0.0.65 Introduce deduplicate processor
1 parent 48f7910 commit 7aadb64

File tree

6 files changed

+71
-2
lines changed

6 files changed

+71
-2
lines changed

PROCESSORS.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -567,6 +567,23 @@ def filter_rows(condition=None, equals=tuple(), not_equals=tuple(), resources=No
567567
- `None` indicates operation should be done on all resources
568568
- The index of the resource in the package
569569

570+
#### deduplicate.py
571+
Deduplicates rows in resources based on the resources' primary key
572+
573+
`deduplicate` accepts a resource specifier.
574+
For each resource, it will output only unique rows (based on the values in the primary key fields). Rows with duplicate primary keys will be ignored.
575+
576+
```python
577+
def deduplicate(resources=None):
578+
pass
579+
```
580+
581+
- `resources`
582+
- A name of a resource to operate on
583+
- A regular expression matching resource names
584+
- A list of resource names
585+
- `None` indicates operation should be done on all resources
586+
- The index of the resource in the package
570587

571588
### Manipulate package
572589
#### update_package.py

dataflows/VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
0.0.64
1+
0.0.65

dataflows/helpers/iterable_loader.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ def handle_iterable(self):
8888
try:
8989
for x in self.iterable:
9090
if mode is None:
91-
assert isinstance(x, (dict, list)), 'Bad item %r' % x
91+
assert isinstance(x, (dict, list, tuple)), 'Bad item %r' % x
9292
mode = dict if isinstance(x, dict) else list
9393
assert isinstance(x, mode)
9494
if mode == dict:

dataflows/processors/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from .checkpoint import checkpoint
1010
from .concatenate import concatenate
1111
from .delete_fields import delete_fields
12+
from .deduplicate import deduplicate
1213
from .duplicate import duplicate
1314
from .filter_rows import filter_rows
1415
from .find_replace import find_replace
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
from dataflows import PackageWrapper, ResourceWrapper
2+
3+
from ..helpers.resource_matcher import ResourceMatcher
4+
5+
6+
def deduper(rows: ResourceWrapper):
7+
pk = rows.res.descriptor['schema'].get('primaryKey', [])
8+
if len(pk) == 0:
9+
yield from rows
10+
else:
11+
keys = set()
12+
for row in rows:
13+
key = tuple(row[k] for k in pk)
14+
if key in keys:
15+
continue
16+
keys.add(key)
17+
yield row
18+
19+
20+
def deduplicate(resources=None):
21+
22+
def func(package: PackageWrapper):
23+
resource_matcher = ResourceMatcher(resources, package)
24+
yield package.pkg
25+
resource: ResourceWrapper
26+
for resource in package:
27+
if resource_matcher.match(resource.res.name):
28+
yield deduper(resource)
29+
else:
30+
yield resource
31+
return func

tests/test_lib.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -471,6 +471,26 @@ def test_sort_rows_datetime():
471471
]
472472

473473

474+
def test_deduplicate():
475+
from dataflows import deduplicate, set_primary_key
476+
477+
a = [
478+
{'a': 1, 'b': 3, 'c': 'First'},
479+
{'a': 2, 'b': 3, 'c': 'First'},
480+
{'a': 1, 'b': 3, 'c': '!First'},
481+
{'a': 1, 'b': 2, 'c': 'First'},
482+
{'a': 2, 'b': 3, 'c': '!First'},
483+
]
484+
485+
f = Flow(
486+
a,
487+
set_primary_key(['a', 'b']),
488+
deduplicate(),
489+
)
490+
results, _, _ = f.results()
491+
assert set(x['c'] for x in results[0]) == {'First'}
492+
493+
474494
def test_duplicate():
475495
from dataflows import duplicate
476496

0 commit comments

Comments
 (0)