Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions test/test_iterdatapipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
MapKeyZipper,
MaxTokenBucketizer,
ParagraphAggregator,
RenameKeys,
Rows2Columnar,
SampleMultiplexer,
UnZipper,
Expand Down Expand Up @@ -902,6 +903,18 @@ def test_mux_longest_iterdatapipe(self):
with self.assertRaises(TypeError):
len(output_dp)

def test_renamer(self):

# Functional Test: verify that renaming by patterns yields correct output
stage1 = IterableWrapper([
{"1.txt": "1", "1.bin": "1b"},
{"2.txt": "2", "2.bin": "2b"},
])
stage2 = RenameKeys(stage1, t="*.txt", b="*.bin")
output = list(iter(stage2))
assert len(output) == 2
assert set(output[0].keys()) == set(["t", "b"])

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please test other boolean flags.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

def test_zip_longest_iterdatapipe(self):

# Functional Test: raises TypeError when an input is not of type `IterDataPipe`
Expand Down
8 changes: 7 additions & 1 deletion torchdata/datapipes/iter/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,12 @@
TFRecordLoaderIterDataPipe as TFRecordLoader,
)
from torchdata.datapipes.iter.util.unzipper import UnZipperIterDataPipe as UnZipper
from torchdata.datapipes.iter.util.webdataset import WebDatasetIterDataPipe as WebDataset
from torchdata.datapipes.iter.util.webdataset import (
WebDatasetIterDataPipe as WebDataset,
)
from torchdata.datapipes.iter.util.renamekeys import (
RenameKeysIterDataPipe as RenameKeys,
)
from torchdata.datapipes.iter.util.xzfileloader import (
XzFileLoaderIterDataPipe as XzFileLoader,
XzFileReaderIterDataPipe as XzFileReader,
Expand Down Expand Up @@ -172,6 +177,7 @@
"ParagraphAggregator",
"ParquetDataFrameLoader",
"RarArchiveLoader",
"RenameKeys",
"RoutedDecoder",
"Rows2Columnar",
"S3FileLister",
Expand Down
79 changes: 79 additions & 0 deletions torchdata/datapipes/iter/util/renamekeys.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

import re
from fnmatch import fnmatch
from typing import Dict, Iterator, List, Union

from torchdata.datapipes import functional_datapipe
from torchdata.datapipes.iter import IterDataPipe


@functional_datapipe("rename_keys")
class RenameKeysIterDataPipe(IterDataPipe[Dict]):
r"""
Given a stream of dictionaries, rename keys using glob patterns.
Args:
source_datapipe: a DataPipe yielding a stream of dictionaries.
keep_unselected: keep keys/value pairs even if they don't match any pattern (False)
must_match: all key value pairs must match (True)
duplicate_is_error: it is an error if two renamings yield the same key (True)
Comment on lines +27 to +29
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Should we move these after *args?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
duplicate_is_error: it is an error if two renamings yield the same key (True)
duplicate_is_error: it is an error if two renamings yield the same key (True); otherwise the first matched one will be returned

*args: `(renamed, pattern)` pairs
**kw: `renamed=pattern` pairs
Returns:
a DataPipe yielding a stream of dictionaries.
Examples:
>>> dp = IterableWrapper([{"/a/b.jpg": b"data"}]).rename_keys(image="*.jpg")
"""

def __init__(
self,
source_datapipe: IterDataPipe[List[Union[Dict, List]]],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

source_datapipe: IterDataPipe[Dict[Any, Any]]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function isn't primarily as a general stream transformation, but as a quick, simple, and readable way of extracting fields for further processing in a data pipeline. That is, usually this is used for getting tar files with different file name patterns and The current API addresses that use case really well. I would recommend keeping it as it is.

*args,
keep_unselected=False,
must_match=True,
duplicate_is_error=True,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
duplicate_is_error=True,
allow_duplicate=False,

nit: might be a better name but feel free to ignore

**kw,
) -> None:
super().__init__()
self.source_datapipe: IterDataPipe[List[Union[Dict, List]]] = source_datapipe
self.must_match = must_match
self.keep_unselected = keep_unselected
self.duplicate_is_error = duplicate_is_error
self.renamings = [(pattern, output) for output, pattern in args]
self.renamings += [(pattern, output) for output, pattern in kw.items()]

def __iter__(self) -> Iterator[Dict]:
for sample in self.source_datapipe:
new_sample = {}
matched = {k: False for k, _ in self.renamings}
for path, value in sample.items():
fname = re.sub(r".*/", "", path)
new_name = None
for pattern, name in self.renamings[::-1]:
if fnmatch(fname.lower(), pattern):
matched[pattern] = True
new_name = name
break
if new_name is None:
if self.keep_unselected:
new_sample[path] = value
continue
if new_name in new_sample:
if self.duplicate_is_error:
raise ValueError(f"Duplicate value in sample {sample.keys()} after rename.")
continue
new_sample[new_name] = value
if self.must_match and not all(matched.values()):
raise ValueError(f"Not all patterns ({matched}) matched sample keys ({sample.keys()}).")

yield new_sample

def __len__(self) -> int:
return len(self.source_datapipe)