|
1 | 1 | from datachain import C, DataChain |
2 | 2 | from datachain.lib.webdataset import process_webdataset |
3 | 3 | from datachain.lib.webdataset_laion import WDSLaion, process_laion_meta |
| 4 | +from datachain.sql.functions import path |
4 | 5 |
|
5 | | -wds = ( |
| 6 | +wds_images = ( |
6 | 7 | DataChain.from_storage("gs://datachain-demo/datacomp-small/shards") |
7 | | - .filter(C("file.name").glob("00000000.tar")) |
| 8 | + .filter(C("file.name").glob("000000[0-5]*.tar")) # from *00.tar to *59.tar |
8 | 9 | .settings(cache=True) |
9 | 10 | .gen(laion=process_webdataset(spec=WDSLaion), params="file") |
10 | | - .save() # materialize chain to avoid downloading data multiple times |
11 | 11 | ) |
12 | 12 |
|
13 | | -meta_pq = ( |
| 13 | +wds_with_pq = ( |
14 | 14 | DataChain.from_parquet("gs://datachain-demo/datacomp-small/metadata/0020f*.parquet") |
15 | | - .filter( |
16 | | - C("uid").in_(values[0] for values in wds.select("laion.json.uid").collect()) |
17 | | - ) |
18 | | - .map(stem=lambda file: file.get_file_stem(), params=["source.file"], output=str) |
19 | | - .save() |
| 15 | + .settings(cache=True) |
| 16 | + .merge(wds_images, on="uid", right_on="laion.json.uid", inner=True) |
| 17 | + .mutate(stem=path.file_stem(C("source.file.name"))) |
20 | 18 | ) |
21 | 19 |
|
22 | | -meta_emd = ( |
| 20 | +res = ( |
23 | 21 | DataChain.from_storage("gs://datachain-demo/datacomp-small/metadata/0020f*.npz") |
| 22 | + .settings(cache=True) |
24 | 23 | .gen(emd=process_laion_meta) |
25 | | - .filter( |
26 | | - C("emd.index").in_( |
27 | | - values[0] for values in meta_pq.select("source.index").collect() |
28 | | - ) |
| 24 | + .mutate(stem=path.file_stem(C("emd.file.name"))) |
| 25 | + .merge( |
| 26 | + wds_with_pq, |
| 27 | + on=["stem", "emd.index"], |
| 28 | + right_on=["stem", "source.index"], |
| 29 | + inner=True, |
29 | 30 | ) |
30 | | - .map(stem=lambda file: file.get_file_stem(), params=["emd.file"], output=str) |
| 31 | + .save("wds") |
31 | 32 | ) |
32 | 33 |
|
33 | | - |
34 | | -meta = meta_emd.merge( |
35 | | - meta_pq, |
36 | | - on=["stem", "emd.index"], |
37 | | - right_on=["stem", "source.index"], |
38 | | -) |
39 | | - |
40 | | -res = wds.merge(meta, on="laion.json.uid", right_on="uid") |
41 | | - |
42 | | -res.show(3) |
| 34 | +res.show(5) |
0 commit comments