diff --git a/dataflows/processors/__init__.py b/dataflows/processors/__init__.py index 0cd73d4..d7104e7 100644 --- a/dataflows/processors/__init__.py +++ b/dataflows/processors/__init__.py @@ -25,6 +25,7 @@ from .stream import stream from .unpivot import unpivot from .unstream import unstream +from .unwind import unwind from .update_package import update_package, add_metadata from .update_resource import update_resource from .update_schema import update_schema diff --git a/dataflows/processors/unwind.py b/dataflows/processors/unwind.py new file mode 100644 index 0000000..fca5212 --- /dev/null +++ b/dataflows/processors/unwind.py @@ -0,0 +1,54 @@ +from dataflows.helpers.resource_matcher import ResourceMatcher +from dataflows.processors.add_computed_field import get_new_fields + + +def unwind(from_key: str, to_key: dict, transformer=None, resources=None, source_delete=True): + + """From a row of data, generate a row per value from from_key, where the value is set onto to_key.""" + + def _unwinder(rows): + for row in rows: + try: + iter(row[from_key]) + for value in row[from_key]: + ret = {} + ret.update(row) + ret[to_key['name']] = value if transformer is None else transformer(value) + if source_delete is True: + del ret[from_key] + yield ret + except TypeError: + # no iterable to unwind. Take the value we have and set it on the to_key. + ret = {} + ret.update(row) + ret[to_key['name']] = ( + ret[from_key] if transformer is None else transformer(ret[from_key]) + ) + if source_delete is True: + del ret[from_key] + yield ret + + def func(package): + matcher = ResourceMatcher(resources, package.pkg) + for resource in package.pkg.descriptor['resources']: + if matcher.match(resource['name']): + new_fields = get_new_fields( + resource, [{'target': {'name': to_key['name'], 'type': to_key['type']}}] + ) + if source_delete is True: + resource['schema']['fields'] = [ + field + for field in resource['schema']['fields'] + if not field['name'] == from_key + ] + resource['schema']['fields'].extend(new_fields) + + yield package.pkg + + for resource in package: + if matcher.match(resource.res.name): + yield _unwinder(resource) + else: + yield resource + + return func diff --git a/tests/test_lib.py b/tests/test_lib.py index cde9044..fb0e97a 100644 --- a/tests/test_lib.py +++ b/tests/test_lib.py @@ -2309,4 +2309,106 @@ def mult(row): add_field('c', 'integer'), parallelize(mult), ).results()[0][0][:100] - print(res) \ No newline at end of file + print(res) + + +def test_unwind_basic(): + from dataflows import Flow, unwind + + data = [ + { + 'id': 1, + 'title': 'Blog Post', + 'tags': ['hello', 'world'], + 'comments': ['Nice post.', 'Well written'], + } + ] + results, dp, _ = Flow( + data, + unwind('tags', {'name': 'tag', 'type': 'string'}), + ).results() + + assert len(results[0]) == 2 + assert results[0][0]['tag'] == 'hello' + assert 'tag' in [f.name for f in dp.resources[0].schema.fields] + + +def test_unwind_twice_in_flow(): + from dataflows import Flow, unwind + + data = [ + { + 'id': 1, + 'title': 'Blog Post', + 'tags': ['hello', 'world'], + 'comments': ['Nice post.', 'Well written'], + } + ] + results, dp, _ = Flow( + data, + unwind('tags', {'name': 'tag', 'type': 'string'}), + unwind('comments', {'name': 'comment', 'type': 'string'}), + ).results() + + assert len(results[0]) == 4 + assert results[0][0]['tag'] == 'hello' + assert results[0][0]['comment'] == 'Nice post.' + + +def test_unwind_from_key_not_iterable(): + from dataflows import Flow, unwind + + data = [ + {'id': 1, 'title': 'Blog Post', 'tags': None, 'comments': ['Nice post.', 'Well written']} + ] + results, dp, _ = Flow( + data, + unwind('tags', {'name': 'tag', 'type': 'string'}), + ).results() + + assert len(results[0]) == 1 + assert results[0][0]['tag'] == None + + +def test_unwind_source_delete(): + from dataflows import Flow, unwind + + data = [{'id': 1, 'title': 'Blog Post', 'tags': ['hello', 'world']}] + results, dp, _ = Flow( + data, + unwind('tags', {'name': 'tag', 'type': 'string'}), + ).results() + + assert len(results[0]) == 2 + assert results[0][0]['tag'] + assert not results[0][0].get('tags') + assert 'tags' not in [f.name for f in dp.resources[0].schema.fields] + + +def test_unwind_source_keep(): + from dataflows import Flow, unwind + + data = [{'id': 1, 'title': 'Blog Post', 'tags': ['hello', 'world']}] + results, dp, _ = Flow( + data, + unwind('tags', {'name': 'tag', 'type': 'string'}, source_delete=False), + ).results() + + assert len(results[0]) == 2 + assert results[0][0]['tag'] + assert results[0][0]['tags'] == data[0]['tags'] + assert 'tags' in [f.name for f in dp.resources[0].schema.fields] + + +def test_unwind_with_transformer(): + from dataflows import Flow, unwind + + data = [{'id': 1, 'title': 'Blog Post', 'tags': ['hello', 'world']}] + results, dp, _ = Flow( + data, + unwind('tags', {'name': 'tag', 'type': 'string'}, lambda v: v.title()), + ).results() + + assert len(results[0]) == 2 + assert results[0][0]['tag'] == 'Hello' + assert results[0][1]['tag'] == 'World'