diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index e77158c98..ccac89c30 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -92,17 +92,22 @@ When adding a new DataPipe, there are few things that need to be done to ensure [test requirements](https://github.com/pytorch/data/issues/106) that we have. - One test that is commonly missed is the serialization test. Please add the new DataPipe to [`test_serialization.py`](https://github.com/pytorch/data/blob/main/test/test_serialization.py). - - If your test requires interacting with files in the file system (e.g. opening a `csv` or `tar` file, we prefer - those files to be generated during the test (see `test_local_io.py`). If the file is on a remote server, see - `test_remote_io.py`. + - If your test requires interacting with files in the file system (e.g. opening a `csv` or `tar` file), we prefer + those files to be generated during the test (see + [`test_local_io.py`](https://github.com/pytorch/data/blob/main/test/test_local_io.py)). If the file is on a remote + server, see [`test_remote_io.py`](https://github.com/pytorch/data/blob/main/test/test_remote_io.py). 3. Documentation - ensure that the DataPipe has docstring, usage example, and that it is added to the right category of - the right RST file to be rendered. - - If your DataPipe has a functional form (i.e. `@functional_datapipe(...)`), include at the + the right RST file (in [`docs/source`](https://github.com/pytorch/data/tree/main/docs/source)) to be rendered. + - If your DataPipe has a functional form (i.e. `@functional_datapipe(...)`), include it at the [end of the first sentence](https://github.com/pytorch/data/blob/main/torchdata/datapipes/iter/util/combining.py#L25) of your docstring. This will make sure it correctly shows up in the [summary table](https://pytorch.org/data/main/torchdata.datapipes.iter.html#archive-datapipes) of our documentation. -4. Import - import the DataPipe in the correct `__init__.py` file. + - For usage examples we support both standard doctest-style interactive Python sessions and code-output-style blocks. + See [sphinx doctest](https://www.sphinx-doc.org/en/master/usage/extensions/doctest.html) for more information. To + build the documentation and validate that your example works please refer to + [`docs`](https://github.com/pytorch/data/tree/main/docs). +4. Import - import the DataPipe in the correct `__init__.py` file and add it to the `__all__` list. 5. Interface - if the DataPipe has a functional form, make sure that is generated properly by `gen_pyi.py` into the relevant interface file. - You can re-generate the pyi files by re-running `pip install -e .`, then you can examine the new outputs. diff --git a/docs/README.md b/docs/README.md index 14ca32cdb..37dad1319 100644 --- a/docs/README.md +++ b/docs/README.md @@ -8,12 +8,23 @@ pip install -r requirements.txt ``` You can then build the documentation by running `make ` from the `docs/` folder. Run `make` to get a list of all -available output formats. +available output formats. Run ```bash make html ``` +to build the documentation. The html files can then be found in `build/html`. To validate the code examples use: + +```bash +make doctest +``` + +Note that currently only code-output-style blocks are tested as many standard reST doctest examples do not work atm. The +results can then be found in `build/html/output.txt`. To also test interactive Python sessions you can temporarily +replace `doctest_test_doctest_blocks` in +[`source/conf.py`](https://github.com/pytorch/data/blob/main/docs/source/conf.py) with a non-empty string. + ## Improving the Documentation Feel free to open an issue or pull request to inform us of any inaccuracy or potential improvement that we can make to diff --git a/docs/source/conf.py b/docs/source/conf.py index 3391a3dcc..1fd6b4d4a 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -61,6 +61,24 @@ # be successively migrated to sphinx's doctest directive. doctest_test_doctest_blocks = "" +doctest_global_setup = """ +import torch +from torchdata.datapipes.iter import IterableWrapper, FileLister, FileOpener + +io_doctest = True + +try: + import torcharrow.dtypes as dt +except ImportError: + dt = None + +try: + import rarfile + rarfile.tool_setup() +except Exception: + rarfile = None +""" + # Add any paths that contain templates here, relative to this directory. templates_path = ["_templates"] diff --git a/torchdata/datapipes/iter/util/bz2fileloader.py b/torchdata/datapipes/iter/util/bz2fileloader.py index 442df0392..dcd280c2f 100644 --- a/torchdata/datapipes/iter/util/bz2fileloader.py +++ b/torchdata/datapipes/iter/util/bz2fileloader.py @@ -33,13 +33,20 @@ class Bz2FileLoaderIterDataPipe(IterDataPipe[Tuple[str, BufferedIOBase]]): or let Python's GC close them periodically. Example: - >>> from torchdata.datapipes.iter import FileLister, FileOpener - >>> datapipe1 = FileLister(".", "*.bz2") - >>> datapipe2 = FileOpener(datapipe1, mode="b") - >>> bz2_loader_dp = datapipe2.load_from_bz2() - >>> for _, stream in bz2_loader_dp: - >>> print(stream.read()) + + .. testcode:: + + filenames_dp = FileLister(".", "*.bz2") + files_dp = filenames_dp.open_files(mode="b") + bz2_loader_dp = files_dp.load_from_bz2() + for _, stream in bz2_loader_dp: + print(stream.read()) + + .. testoutput:: + :skipif: io_doctest + b'0123456789abcdef' + """ def __init__(self, datapipe: Iterable[Tuple[str, BufferedIOBase]], length: int = -1) -> None: diff --git a/torchdata/datapipes/iter/util/combining.py b/torchdata/datapipes/iter/util/combining.py index bd9d70769..b543ffc5f 100644 --- a/torchdata/datapipes/iter/util/combining.py +++ b/torchdata/datapipes/iter/util/combining.py @@ -43,16 +43,24 @@ class IterKeyZipperIterDataPipe(IterDataPipe[T_co]): by default a tuple is created Example: - >>> from torchdata.datapipes.iter import IterableWrapper - >>> from operator import itemgetter - >>> def merge_fn(t1, t2): - >>> return t1[1] + t2[1] - >>> dp1 = IterableWrapper([('a', 100), ('b', 200), ('c', 300)]) - >>> dp2 = IterableWrapper([('a', 1), ('b', 2), ('c', 3), ('d', 4)]) - >>> res_dp = dp1.zip_with_iter(dp2, key_fn=itemgetter(0), - >>> ref_key_fn=itemgetter(0), keep_key=True, merge_fn=merge_fn) - >>> list(res_dp) + + .. testcode:: + + from operator import itemgetter + + def merge_fn(t1, t2): + return t1[1] + t2[1] + + dp1 = IterableWrapper([('a', 100), ('b', 200), ('c', 300)]) + dp2 = IterableWrapper([('a', 1), ('b', 2), ('c', 3), ('d', 4)]) + res_dp = dp1.zip_with_iter(dp2, key_fn=itemgetter(0), + ref_key_fn=itemgetter(0), keep_key=True, merge_fn=merge_fn) + print(list(res_dp)) + + .. testoutput:: + [('a', 101), ('b', 202), ('c', 303)] + """ def __init__( diff --git a/torchdata/datapipes/iter/util/dataframemaker.py b/torchdata/datapipes/iter/util/dataframemaker.py index 5e24e8496..1b8b3d4a0 100644 --- a/torchdata/datapipes/iter/util/dataframemaker.py +++ b/torchdata/datapipes/iter/util/dataframemaker.py @@ -54,19 +54,27 @@ class DataFrameMakerIterDataPipe(IterDataPipe): # IterDataPipe[torcharrow.IData device: specify the device on which the DataFrame will be stored Example: - >>> from torchdata.datapipes.iter import IterableWrapper - >>> import torcharrow.dtypes as dt - >>> source_data = [(i,) for i in range(3)] - >>> source_dp = IterableWrapper(source_data) - >>> DTYPE = dt.Struct([dt.Field("Values", dt.int32)]) - >>> df_dp = source_dp.dataframe(dtype=DTYPE) - >>> list(df_dp)[0] + + .. testcode:: + :skipif: dt is None + + import torcharrow.dtypes as dt + source_data = [(i,) for i in range(3)] + source_dp = IterableWrapper(source_data) + DTYPE = dt.Struct([dt.Field("Values", dt.int32)]) + df_dp = source_dp.dataframe(dtype=DTYPE) + print(list(df_dp)[0]) + + .. testoutput:: + :skipif: io_doctest + index Values ------- -------- 0 0 1 1 2 2 dtype: Struct([Field('Values', int32)]), count: 3, null_count: 0 + """ def __new__( @@ -105,18 +113,26 @@ class ParquetDFLoaderIterDataPipe(IterDataPipe): # IterDataPipe[torcharrow.IDat device: specify the device on which the DataFrame will be stored Example: - >>> from torchdata.datapipes.iter import FileLister - >>> import torcharrow.dtypes as dt - >>> DTYPE = dt.Struct([dt.Field("Values", dt.int32)]) - >>> source_dp = FileLister(".", masks="df*.parquet") - >>> parquet_df_dp = source_dp.load_parquet_as_df(dtype=DTYPE) - >>> list(parquet_df_dp)[0] + + .. testcode:: + :skipif: dt is None + + import torcharrow.dtypes as dt + DTYPE = dt.Struct([dt.Field("Values", dt.int32)]) + source_dp = FileLister(".", masks="df*.parquet") + parquet_df_dp = source_dp.load_parquet_as_df(dtype=DTYPE) + print(list(parquet_df_dp)[0]) + + .. testoutput:: + :skipif: io_doctest + index Values ------- -------- 0 0 1 1 2 2 dtype: Struct([Field('Values', int32)]), count: 3, null_count: 0 + """ def __init__( diff --git a/torchdata/datapipes/iter/util/decompressor.py b/torchdata/datapipes/iter/util/decompressor.py index aafcb7144..474a634a4 100644 --- a/torchdata/datapipes/iter/util/decompressor.py +++ b/torchdata/datapipes/iter/util/decompressor.py @@ -41,13 +41,20 @@ class DecompressorIterDataPipe(IterDataPipe[Tuple[str, StreamWrapper]]): file_type: Optional `string` or ``CompressionType`` that represents what compression format of the inputs Example: - >>> from torchdata.datapipes.iter import FileLister, FileOpener - >>> tar_file_dp = FileLister(self.temp_dir.name, "*.tar") - >>> tar_load_dp = FileOpener(tar_file_dp, mode="b") - >>> tar_decompress_dp = Decompressor(tar_load_dp, file_type="tar") - >>> for _, stream in tar_decompress_dp: - >>> print(stream.read()) + + .. testcode:: + + tar_file_dp = FileLister(".", "*.tar") + tar_load_dp = tar_file_dp.open_files(mode="b") + tar_decompress_dp = tar_load_dp.decompress(file_type="tar") + for _, stream in tar_decompress_dp: + print(stream.read()) + + .. testoutput:: + :skipif: io_doctest + b'0123456789abcdef' + """ types = CompressionType diff --git a/torchdata/datapipes/iter/util/hashchecker.py b/torchdata/datapipes/iter/util/hashchecker.py index 9cb32ac6a..abbee4a4b 100644 --- a/torchdata/datapipes/iter/util/hashchecker.py +++ b/torchdata/datapipes/iter/util/hashchecker.py @@ -33,19 +33,22 @@ class HashCheckerIterDataPipe(IterDataPipe[Tuple[str, U]]): does not work with non-seekable stream, e.g. HTTP) Example: - >>> from torchdata.datapipes.iter import IterableWrapper, FileOpener - >>> expected_MD5_hash = "bb9675028dd39d2dd2bf71002b93e66c" - File is from "https://raw.githubusercontent.com/pytorch/data/main/LICENSE" - >>> file_dp = FileOpener(IterableWrapper(["LICENSE.txt"]), mode='rb') - >>> # An exception is only raised when the hash doesn't match, otherwise (path, stream) is returned - >>> check_hash_dp = file_dp.check_hash({"LICENSE.txt": expected_MD5_hash}, "md5", rewind=True) - >>> reader_dp = check_hash_dp.readlines() - >>> it = iter(reader_dp) - >>> path, line = next(it) - >>> path - LICENSE.txt - >>> line - b'BSD 3-Clause License' + + .. testcode:: + :skipif: io_doctest + + expected_MD5_hash = "bb9675028dd39d2dd2bf71002b93e66c" + # File is from "https://raw.githubusercontent.com/pytorch/data/main/LICENSE" + file_dp = FileOpener(IterableWrapper(["LICENSE.txt"]), mode='rb') + # An exception is only raised when the hash doesn't match, otherwise (path, stream) is returned + check_hash_dp = file_dp.check_hash({"LICENSE.txt": expected_MD5_hash}, "md5", rewind=True) + reader_dp = check_hash_dp.readlines() + + it = iter(reader_dp) + path, line = next(it) + assert path == LICENSE.txt + assert line == b'BSD 3-Clause License' + """ def __init__( diff --git a/torchdata/datapipes/iter/util/header.py b/torchdata/datapipes/iter/util/header.py index b16efe564..961f6091d 100644 --- a/torchdata/datapipes/iter/util/header.py +++ b/torchdata/datapipes/iter/util/header.py @@ -78,17 +78,21 @@ class LengthSetterIterDataPipe(IterDataPipe[T_co]): length: the integer value that will be set as the length Example: - >>> from torchdata.datapipes.iter import IterableWrapper - >>> dp = IterableWrapper(range(10)).filter(lambda x: x < 5).set_length(3) - >>> list(dp) # Notice that the number of elements yielded is unchanged - [0, 1, 2, 3, 4] - >>> len(dp) - 3 - >>> header_dp = IterableWrapper(range(10)).filter(lambda x: x < 5).header(3) - >>> list(header_dp) # Use `.header()` if you want to limit the number of elements yielded - [0, 1, 2] - >>> len(header_dp) - 3 + + .. testcode:: + + dp = IterableWrapper(range(10)).filter(lambda x: x < 5).set_length(3) + # Notice that the number of elements yielded is unchanged + assert list(dp) == [0, 1, 2, 3, 4] + assert len(dp) == 3 + + .. testcode:: + + header_dp = IterableWrapper(range(10)).filter(lambda x: x < 5).header(3) + # Use `.header()` if you want to limit the number of elements yielded + assert list(header_dp) == [0, 1, 2] + assert len(header_dp) == 3 + """ def __init__(self, source_datapipe: IterDataPipe[T_co], length: int) -> None: diff --git a/torchdata/datapipes/iter/util/jsonparser.py b/torchdata/datapipes/iter/util/jsonparser.py index 68a45259a..c270cf83b 100644 --- a/torchdata/datapipes/iter/util/jsonparser.py +++ b/torchdata/datapipes/iter/util/jsonparser.py @@ -21,16 +21,29 @@ class JsonParserIterDataPipe(IterDataPipe[Tuple[str, Dict]]): kwargs: keyword arguments that will be passed through to ``json.loads`` Example: - >>> from torchdata.datapipes.iter import IterableWrapper, FileOpener - >>> import os - >>> def get_name(path_and_stream): - >>> return os.path.basename(path_and_stream[0]), path_and_stream[1] - >>> datapipe1 = IterableWrapper(["empty.json", "1.json", "2.json"]) - >>> datapipe2 = FileOpener(datapipe1, mode="b") - >>> datapipe3 = datapipe2.map(get_name) - >>> json_dp = datapipe3.parse_json_files() - >>> list(json_dp) + + .. testcode:: + :skipif: io_doctest + + # assume the files look like this: + # 1.json: '["foo", {"bar":["baz", null, 1.0, 2]}]' + # 2.json: '{"__complex__": true, "real": 1, "imag": 2}' + + import os + + def get_name(path_and_stream): + return os.path.basename(path_and_stream[0]), path_and_stream[1] + + source_dp = IterableWrapper(["1.json", "2.json"]) + datapipe2 = source_dp.open_files(mode="b") + datapipe3 = datapipe2.map(get_name) + json_dp = datapipe3.parse_json_files() + print(list(json_dp)) + + .. testoutput:: + [('1.json', ['foo', {'bar': ['baz', None, 1.0, 2]}]), ('2.json', {'__complex__': True, 'real': 1, 'imag': 2})] + """ def __init__(self, source_datapipe: IterDataPipe[Tuple[str, IO]], **kwargs) -> None: diff --git a/torchdata/datapipes/iter/util/mux_longest.py b/torchdata/datapipes/iter/util/mux_longest.py index 2b6e328b9..bd849b42d 100644 --- a/torchdata/datapipes/iter/util/mux_longest.py +++ b/torchdata/datapipes/iter/util/mux_longest.py @@ -21,10 +21,16 @@ class MultiplexerLongestIterDataPipe(IterDataPipe): datapipes: Iterable DataPipes that will take turn to yield their elements, until they are all exhausted Example: - >>> from torchdata.datapipes.iter import IterableWrapper - >>> dp1, dp2, dp3 = IterableWrapper(range(5)), IterableWrapper(range(10, 15)), IterableWrapper(range(20, 25)) - >>> list(dp1.mux_longest(dp2, dp3)) - [0, 10, 20, 1, 11, 21, 2, 12, 22, 3, 13, 23, 4, 14, 24] + + .. testcode:: + + dp1, dp2, dp3 = IterableWrapper(range(5)), IterableWrapper(range(10, 12)), IterableWrapper(range(20, 25)) + print(list(dp1.mux_longest(dp2, dp3))) + + .. testoutput:: + + [0, 10, 20, 1, 11, 21, 2, 22, 3, 23, 4, 24] + """ def __init__(self, *datapipes): diff --git a/torchdata/datapipes/iter/util/plain_text_reader.py b/torchdata/datapipes/iter/util/plain_text_reader.py index b5d876bd3..b94132008 100644 --- a/torchdata/datapipes/iter/util/plain_text_reader.py +++ b/torchdata/datapipes/iter/util/plain_text_reader.py @@ -99,14 +99,21 @@ class LineReaderIterDataPipe(IterDataPipe[Union[Str_Or_Bytes, Tuple[str, Str_Or_ than just the contents Example: - >>> from torchdata.datapipes.iter import IterableWrapper - >>> import io - >>> text1 = "Line1\nLine2" - >>> text2 = "Line2,1\r\nLine2,2\r\nLine2,3" - >>> source_dp = IterableWrapper([("file1", io.StringIO(text1)), ("file2", io.StringIO(text2))]) - >>> line_reader_dp = source_dp.readlines() - >>> list(line_reader_dp) + + .. testcode:: + + import io + + text1 = "Line1\nLine2" + text2 = "Line2,1\r\nLine2,2\r\nLine2,3" + source_dp = IterableWrapper([("file1", io.StringIO(text1)), ("file2", io.StringIO(text2))]) + line_reader_dp = source_dp.readlines() + print(list(line_reader_dp)) + + .. testoutput:: + [('file1', 'Line1'), ('file1', 'Line2'), ('file2', 'Line2,1'), ('file2', 'Line2,2'), ('file2', 'Line2,3')] + """ def __init__( @@ -192,16 +199,30 @@ class CSVParserIterDataPipe(_CSVBaseParserIterDataPipe): as_tuple: if ``True``, each line will return a tuple instead of a list Example: - >>> from torchdata.datapipes.iter import IterableWrapper, FileOpener - >>> import os - >>> def get_name(path_and_stream): - >>> return os.path.basename(path_and_stream[0]), path_and_stream[1] - >>> datapipe1 = IterableWrapper(["1.csv", "empty.csv", "empty2.csv"]) - >>> datapipe2 = FileOpener(datapipe1, mode="b") - >>> datapipe3 = datapipe2.map(get_name) - >>> csv_parser_dp = datapipe3.parse_csv() - >>> list(csv_parser_dp) + + .. testcode:: + :skipif: io_doctest + + # assume the files look like this: + # 1.csv: "key,item\na,1\nb,2" + # empty.csv: "" + # empty2.csv: "\n" + + import os + + def get_name(path_and_stream): + return os.path.basename(path_and_stream[0]), path_and_stream[1] + + source_dp = IterableWrapper(["1.csv", "empty.csv", "empty2.csv"]) + files_dp = source_dp.open_files(mode="b") + name_and_stream_dp = files_dp.map(get_name) + csv_parser_dp = name_and_stream_dp.parse_csv() + print(list(csv_parser_dp)) + + .. testoutput:: + [['key', 'item'], ['a', '1'], ['b', '2'], []] + """ def __init__( @@ -250,16 +271,30 @@ class CSVDictParserIterDataPipe(_CSVBaseParserIterDataPipe): than just the contents Example: - >>> from torchdata.datapipes.iter import FileLister, FileOpener - >>> import os - >>> def get_name(path_and_stream): - >>> return os.path.basename(path_and_stream[0]), path_and_stream[1] - >>> datapipe1 = FileLister(".", "*.csv") - >>> datapipe2 = FileOpener(datapipe1, mode="b") - >>> datapipe3 = datapipe2.map(get_name) - >>> csv_dict_parser_dp = datapipe3.parse_csv_as_dict() - >>> list(csv_dict_parser_dp) + + .. testcode:: + :skipif: io_doctest + + # assume the files look like this: + # 1.csv: "key,item\na,1\nb,2" + # empty.csv: "" + # empty2.csv: "\n" + + import os + + def get_name(path_and_stream): + return os.path.basename(path_and_stream[0]), path_and_stream[1] + + source_dp = IterableWrapper(["1.csv", "empty.csv", "empty2.csv"]) + files_dp = source_dp.open_files(mode="b") + name_and_stream_dp = files_dp.map(get_name) + csv_dict_parser_dp = name_and_stream_dp.parse_csv_as_dict() + print(list(csv_dict_parser_dp)) + + .. testoutput:: + [{'key': 'a', 'item': '1'}, {'key': 'b', 'item': '2'}] + """ def __init__( diff --git a/torchdata/datapipes/iter/util/prefetcher.py b/torchdata/datapipes/iter/util/prefetcher.py index 7fb02b1ac..658b29ca3 100644 --- a/torchdata/datapipes/iter/util/prefetcher.py +++ b/torchdata/datapipes/iter/util/prefetcher.py @@ -49,8 +49,15 @@ class PrefetcherIterDataPipe(IterDataPipe): buffer_size: the size of the buffer which stores the prefetched samples Example: - >>> from torchdata.datapipes.iter import IterableWrapper - >>> dp = IterableWrapper(file_paths).open_files().prefetch(5) + + .. testsetup:: + + file_paths = [] + + .. testcode:: + + dp = IterableWrapper(file_paths).open_files().prefetch(5) + """ def __init__(self, source_datapipe, buffer_size: int = 10): @@ -157,8 +164,17 @@ class PinMemoryIterDataPipe(PrefetcherIterDataPipe): A ``pin_memory_fn`` to handle general objects is provided by default. Example: - >>> from torchdata.datapipes.iter import IterableWrapper - >>> dp = IterableWrapper(file_paths).open_files().readlines().map(tokenize_fn).pin_memory() + + .. testsetup:: + + file_paths = [] + tokenize_fn = lambda x : x + + .. testcode:: + :skipif: not torch.cuda.is_available() + + dp = IterableWrapper(file_paths).open_files().readlines().map(tokenize_fn).pin_memory() + """ def __init__(self, source_datapipe, device=None, pin_memory_fn=pin_memory_fn): diff --git a/torchdata/datapipes/iter/util/randomsplitter.py b/torchdata/datapipes/iter/util/randomsplitter.py index 27732314f..5a8d035cc 100644 --- a/torchdata/datapipes/iter/util/randomsplitter.py +++ b/torchdata/datapipes/iter/util/randomsplitter.py @@ -31,7 +31,7 @@ class RandomSplitterIterDataPipe(IterDataPipe): resulting DataPipes' length values to be known in advance. seed: random _seed used to determine the randomness of the split total_length: Length of the ``source_datapipe``, optional but providing an integer is highly encouraged, - because not all ``IterDataPipe`` has ``len``, espeically ones that can be easily known in advance. + because not all ``IterDataPipe`` has ``len``, especially ones that can be easily known in advance. target: Optional key (that must exist in ``weights``) to indicate the specific group to return. If set to the default ``None``, returns ``List[IterDataPipe]``. If target is specified, returns ``IterDataPipe``. diff --git a/torchdata/datapipes/iter/util/rararchiveloader.py b/torchdata/datapipes/iter/util/rararchiveloader.py index da9610a54..2aa5db01b 100644 --- a/torchdata/datapipes/iter/util/rararchiveloader.py +++ b/torchdata/datapipes/iter/util/rararchiveloader.py @@ -56,13 +56,21 @@ class RarArchiveLoaderIterDataPipe(IterDataPipe[Tuple[str, io.BufferedIOBase]]): length: Nominal length of the DataPipe Example: - >>> from torchdata.datapipes.iter import FileLister, FileOpener - >>> datapipe1 = FileLister(".", "*.rar") - >>> datapipe2 = FileOpener(datapipe1, mode="b") - >>> rar_loader_dp = datapipe2.load_from_rar() - >>> for _, stream in rar_loader_dp: - >>> print(stream.read()) + + .. testcode:: + :skipif: rarfile is None + + datapipe1 = FileLister(".", "*.rar") + datapipe2 = FileOpener(datapipe1, mode="b") + rar_loader_dp = datapipe2.load_from_rar() + for _, stream in rar_loader_dp: + print(stream.read()) + + .. testoutput:: + :skipif: io_doctest + b'0123456789abcdef' + """ def __init__(self, datapipe: IterDataPipe[Tuple[str, io.BufferedIOBase]], *, length: int = -1): diff --git a/torchdata/datapipes/iter/util/rows2columnar.py b/torchdata/datapipes/iter/util/rows2columnar.py index 14ffdf93c..1f04d0f7c 100644 --- a/torchdata/datapipes/iter/util/rows2columnar.py +++ b/torchdata/datapipes/iter/util/rows2columnar.py @@ -35,18 +35,15 @@ class Rows2ColumnarIterDataPipe(IterDataPipe[Dict]): >>> dp = IterableWrapper([[{'a': 1}, {'b': 2, 'a': 1}], [{'a': 1, 'b': 200}, {'b': 2, 'c': 3, 'a': 100}]]) >>> row2col_dp = dp.rows2columnar() >>> list(row2col_dp) - [defaultdict(, {'a': [1, 1], 'b': [2]}), - defaultdict(, {'a': [1, 100], 'b': [200, 2], 'c': [3]})] + [defaultdict(, {'a': [1, 1], 'b': [2]}), defaultdict(, {'a': [1, 100], 'b': [200, 2], 'c': [3]})] >>> row2col_dp = dp.rows2columnar(column_names=['a']) >>> list(row2col_dp) - [defaultdict(, {'a': [1, 1]}), - defaultdict(, {'a': [1, 100]})] + [defaultdict(, {'a': [1, 1]}), defaultdict(, {'a': [1, 100]})] >>> # Each element in a batch is a `List` >>> dp = IterableWrapper([[[0, 1, 2, 3], [4, 5, 6, 7]]]) >>> row2col_dp = dp.rows2columnar(column_names=["1st_in_batch", "2nd_in_batch", "3rd_in_batch", "4th_in_batch"]) >>> list(row2col_dp) - [defaultdict(, {'1st_in_batch': [0, 4], '2nd_in_batch': [1, 5], - '3rd_in_batch': [2, 6], '4th_in_batch': [3, 7]})] + [defaultdict(, {'1st_in_batch': [0, 4], '2nd_in_batch': [1, 5], '3rd_in_batch': [2, 6], '4th_in_batch': [3, 7]})] """ column_names: List[str] diff --git a/torchdata/datapipes/iter/util/samplemultiplexer.py b/torchdata/datapipes/iter/util/samplemultiplexer.py index 6a06d596e..3f55c1acc 100644 --- a/torchdata/datapipes/iter/util/samplemultiplexer.py +++ b/torchdata/datapipes/iter/util/samplemultiplexer.py @@ -31,8 +31,8 @@ class SampleMultiplexerDataPipe(IterDataPipe[T_co]): Example: >>> from torchdata.datapipes.iter import IterableWrapper, SampleMultiplexer - >>> source_dp1 = IterableWrapper([0] * 10) - >>> source_dp2 = IterableWrapper([1] * 10) + >>> source_dp1 = IterableWrapper([0] * 5) + >>> source_dp2 = IterableWrapper([1] * 5) >>> d = {source_dp1: 99999999, source_dp2: 0.0000001} >>> sample_mul_dp = SampleMultiplexer(pipes_to_weights_dict=d, seed=0) >>> list(sample_mul_dp) diff --git a/torchdata/datapipes/iter/util/saver.py b/torchdata/datapipes/iter/util/saver.py index 0c8332c15..0c93920ae 100644 --- a/torchdata/datapipes/iter/util/saver.py +++ b/torchdata/datapipes/iter/util/saver.py @@ -27,16 +27,25 @@ class SaverIterDataPipe(IterDataPipe[str]): filepath_fn: Function that takes in metadata and returns the target path of the new file Example: - >>> from torchdata.datapipes.iter import IterableWrapper - >>> import os - >>> def filepath_fn(name: str) -> str: - >>> return os.path.join(".", os.path.basename(name)) - >>> name_to_data = {"1.txt": b"DATA1", "2.txt": b"DATA2", "3.txt": b"DATA3"} - >>> source_dp = IterableWrapper(sorted(name_to_data.items())) - >>> saver_dp = source_dp.save_to_disk(filepath_fn=filepath_fn, mode="wb") - >>> res_file_paths = list(saver_dp) - >>> res_file_paths + + .. testcode:: + :skipif: io_doctest + + import os + + def filepath_fn(name: str) -> str: + return os.path.join(".", os.path.basename(name)) + + name_to_data = {"1.txt": b"DATA1", "2.txt": b"DATA2", "3.txt": b"DATA3"} + source_dp = IterableWrapper(sorted(name_to_data.items())) + saver_dp = source_dp.save_to_disk(filepath_fn=filepath_fn, mode="wb") + print(list(saver_dp)) + + .. testoutput:: + :skipif: io_doctest + ['./1.txt', './2.txt', './3.txt'] + """ def __init__( diff --git a/torchdata/datapipes/iter/util/shardexpander.py b/torchdata/datapipes/iter/util/shardexpander.py index 8a3c1fc82..00145dff5 100644 --- a/torchdata/datapipes/iter/util/shardexpander.py +++ b/torchdata/datapipes/iter/util/shardexpander.py @@ -64,7 +64,7 @@ class ShardExpanderIterDataPipe(IterDataPipe[str]): >>> expand_dp = source_dp.shard_expand() >>> list(expand_dp) ['ds-00.tar', 'ds-01.tar', 'ds-02.tar', 'ds-03.tar', 'ds-04.tar', 'ds-05.tar'] - >>> source_dp = IterableWrapper(["imgs_{00..05}.tar", "labels_{00..05}.tar"]) + >>> source_dp = IterableWrapper(["imgs_{00..02}.tar", "labels_{00..02}.tar"]) >>> expand_dp = source_dp.shard_expand() >>> list(expand_dp) ['imgs_00.tar', 'imgs_01.tar', 'imgs_02.tar', 'labels_00.tar', 'labels_01.tar', 'labels_02.tar'] diff --git a/torchdata/datapipes/iter/util/tararchiveloader.py b/torchdata/datapipes/iter/util/tararchiveloader.py index e7cff2b9d..3e1fc44ef 100644 --- a/torchdata/datapipes/iter/util/tararchiveloader.py +++ b/torchdata/datapipes/iter/util/tararchiveloader.py @@ -35,13 +35,19 @@ class TarArchiveLoaderIterDataPipe(IterDataPipe[Tuple[str, BufferedIOBase]]): or let Python's GC close them periodically. Example: - >>> from torchdata.datapipes.iter import FileLister, FileOpener - >>> datapipe1 = FileLister(".", "*.tar") - >>> datapipe2 = FileOpener(datapipe1, mode="b") - >>> tar_loader_dp = datapipe2.load_from_tar() - >>> for _, stream in tar_loader_dp: - >>> print(stream.read()) + + .. testcode:: + + files_dp = FileLister(".", "*.tar").open_files(mode="b") + tar_loader_dp = files_dp.load_from_tar() + for _, stream in tar_loader_dp: + print(stream.read()) + + .. testoutput:: + :skipif: io_doctest + b'0123456789abcdef' + """ def __init__(self, datapipe: Iterable[Tuple[str, BufferedIOBase]], mode: str = "r:*", length: int = -1) -> None: diff --git a/torchdata/datapipes/iter/util/tfrecordloader.py b/torchdata/datapipes/iter/util/tfrecordloader.py index 8ad5ea101..d21923e81 100644 --- a/torchdata/datapipes/iter/util/tfrecordloader.py +++ b/torchdata/datapipes/iter/util/tfrecordloader.py @@ -210,12 +210,14 @@ class TFRecordLoaderIterDataPipe(IterDataPipe[TFRecordExample]): or let Python's GC close them periodically. Example: - >>> from torchdata.datapipes.iter import FileLister, FileOpener - >>> datapipe1 = FileLister(".", "*.tfrecord") - >>> datapipe2 = FileOpener(datapipe1, mode="b") - >>> tfrecord_loader_dp = datapipe2.load_from_tfrecord() - >>> for example in tfrecord_loader_dp: - >>> print(example) + + .. testcode:: + + files_dp = FileLister(".", "*.tfrecord").open_files(mode="b") + tfrecord_loader_dp = files_dp.load_from_tfrecord() + for example in tfrecord_loader_dp: + print(example) + """ def __init__( diff --git a/torchdata/datapipes/iter/util/webdataset.py b/torchdata/datapipes/iter/util/webdataset.py index 164c9151e..0fb397f99 100644 --- a/torchdata/datapipes/iter/util/webdataset.py +++ b/torchdata/datapipes/iter/util/webdataset.py @@ -61,20 +61,21 @@ class WebDatasetIterDataPipe(IterDataPipe[Dict]): a DataPipe yielding a stream of dictionaries Examples: - >>> from torchdata.datapipes.iter import FileLister, FileOpener - >>> - >>> def decode(item): - >>> key, value = item - >>> if key.endswith(".txt"): - >>> return key, value.read().decode("utf-8") - >>> if key.endswith(".bin"): - >>> return key, value.read().decode("utf-8") - >>> - >>> datapipe1 = FileLister("test/_fakedata", "wds*.tar") - >>> datapipe2 = FileOpener(datapipe1, mode="b") - >>> dataset = datapipe2.load_from_tar().map(decode).webdataset() - >>> for obj in dataset: - >>> print(obj) + + .. testcode:: + + def decode(item): + key, value = item + if key.endswith(".txt"): + return key, value.read().decode("utf-8") + if key.endswith(".bin"): + return key, value.read().decode("utf-8") + + files_dp = FileLister(".", "*.tar").open_files(mode="b") + dataset = files_dp.load_from_tar().map(decode).webdataset() + for obj in dataset: + print(obj) + """ def __init__(self, source_datapipe: IterDataPipe[List[Union[Dict, List]]]) -> None: diff --git a/torchdata/datapipes/iter/util/xzfileloader.py b/torchdata/datapipes/iter/util/xzfileloader.py index 9224fc8de..a1090ec66 100644 --- a/torchdata/datapipes/iter/util/xzfileloader.py +++ b/torchdata/datapipes/iter/util/xzfileloader.py @@ -33,13 +33,21 @@ class XzFileLoaderIterDataPipe(IterDataPipe[Tuple[str, BufferedIOBase]]): or let Python's GC close them periodically. Example: - >>> from torchdata.datapipes.iter import FileLister, FileOpener - >>> datapipe1 = FileLister(".", "*.xz") - >>> datapipe2 = FileOpener(datapipe1, mode="b") - >>> xz_loader_dp = datapipe2.load_from_xz() - >>> for _, stream in xz_loader_dp: - >>> print(stream.read()) + + .. testcode:: + + from torchdata.datapipes.iter import FileLister + datapipe1 = FileLister(".", "*.xz") + datapipe2 = datapipe1.open_files(mode="b") + xz_loader_dp = datapipe2.load_from_xz() + for _, stream in xz_loader_dp: + print(stream.read()) + + .. testoutput:: + :skipif: io_doctest + b'0123456789abcdef' + """ def __init__(self, datapipe: Iterable[Tuple[str, BufferedIOBase]], length: int = -1) -> None: diff --git a/torchdata/datapipes/iter/util/zip_longest.py b/torchdata/datapipes/iter/util/zip_longest.py index 3a8f4b982..b637333d6 100644 --- a/torchdata/datapipes/iter/util/zip_longest.py +++ b/torchdata/datapipes/iter/util/zip_longest.py @@ -26,7 +26,7 @@ class ZipperLongestIterDataPipe(IterDataPipe): >>> dp1, dp2, dp3 = IterableWrapper(range(3)), IterableWrapper(range(10, 15)), IterableWrapper(range(20, 25)) >>> list(dp1.zip_longest(dp2, dp3)) [(0, 10, 20), (1, 11, 21), (2, 12, 22), (None, 13, 23), (None, 14, 24)] - >>> list(dp1.zip_longest(dp2, dp3, -1)) + >>> list(dp1.zip_longest(dp2, dp3, fill_value=-1)) [(0, 10, 20), (1, 11, 21), (2, 12, 22), (-1, 13, 23), (-1, 14, 24)] """ datapipes: Tuple[IterDataPipe] diff --git a/torchdata/datapipes/iter/util/ziparchiveloader.py b/torchdata/datapipes/iter/util/ziparchiveloader.py index d70a902d3..603c905ec 100644 --- a/torchdata/datapipes/iter/util/ziparchiveloader.py +++ b/torchdata/datapipes/iter/util/ziparchiveloader.py @@ -35,12 +35,19 @@ class ZipArchiveLoaderIterDataPipe(IterDataPipe[Tuple[str, BufferedIOBase]]): the data_stream variable below cannot be closed within the scope of this function. Example: - >>> from torchdata.datapipes.iter import FileLister, FileOpener - >>> datapipe1 = FileLister(".", "*.zip") - >>> datapipe2 = FileOpener(datapipe1, mode="b") - >>> zip_loader_dp = datapipe2.load_from_zip() - >>> for _, stream in zip_loader_dp: - >>> print(stream.read()) + + .. testcode:: + + from torchdata.datapipes.iter import FileLister + datapipe1 = FileLister(".", "*.zip") + datapipe2 = datapipe1.open_files(mode="b") + zip_loader_dp = datapipe2.load_from_zip() + for _, stream in zip_loader_dp: + print(stream.read()) + + .. testoutput:: + :skipif: io_doctest + b'0123456789abcdef' """