Skip to content

Commit 2479387

Browse files
committed
Create fdb_deblaster.py
1 parent ee3e32d commit 2479387

File tree

1 file changed

+113
-0
lines changed

1 file changed

+113
-0
lines changed

test_scripts/fdb_deblaster.py

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
"""
2+
Convert fdb data with metadata from disk into a qube as fast as possible
3+
4+
Example running this in parallel ls test_scripts/data/*.zst | xargs -n 1 -P 10 python test_scripts/fdb_deblaster.py
5+
"""
6+
7+
import subprocess
8+
import sys
9+
from pathlib import Path
10+
from time import time
11+
12+
from qubed import Qube
13+
14+
if len(sys.argv) != 2:
15+
print("Usage: python fdb_deblaster.py INPUT_PATH")
16+
sys.exit()
17+
18+
assert sys.argv[1].endswith(".zst")
19+
20+
p = Path(sys.argv[1]) # Compressed file
21+
decompressed = p.parent / f"{p.stem}" # decompressed file
22+
output = Path(f"test_scripts/qubes/{p.stem[:-5]}.json")
23+
24+
if output.exists():
25+
sys.exit()
26+
27+
if not decompressed.exists():
28+
result = subprocess.run(["zstd", "--decompress", p], stdout=subprocess.PIPE)
29+
30+
print(f"{output} does not exist")
31+
print(decompressed)
32+
33+
qube = Qube.empty()
34+
35+
one_count = 0
36+
two_count = 0
37+
38+
level_one = {}
39+
level_two = {}
40+
path_meta = {}
41+
42+
level_one_qube = Qube.empty()
43+
level_two_qube = Qube.empty()
44+
level_three_qube = Qube.empty()
45+
46+
t0 = time()
47+
with decompressed.open() as f:
48+
for i, line in enumerate(f.readlines()):
49+
level, key, *metadata = line.strip().split(" ")
50+
51+
if level == "0":
52+
level_one_qube |= level_two_qube
53+
level_two_qube = Qube.empty()
54+
55+
level_one = dict(v.split("=") for v in key.split("/"))
56+
one_count += 1
57+
print(f"{one_count}th level one key, {i / (time() - t0):.0f} leaves/s")
58+
59+
# if one_count > 1:
60+
# print(qube)
61+
# break
62+
63+
elif level == "1":
64+
level_two_qube |= level_three_qube.add_metadata(path_meta)
65+
level_three_qube = Qube.empty()
66+
67+
level_two = dict(v.split("=") for v in key.split("/"))
68+
path_meta = dict(v.split("=") for v in metadata[0].split("/", 3))
69+
two_count += 1
70+
print(f"{two_count}th level two key, {i / (time() - t0):.0f} leaves/s")
71+
# if two_count > 1:
72+
# print(qube)
73+
# break
74+
75+
elif level == "3":
76+
level_three = dict(v.split("=") for v in key.split("/"))
77+
offset_length_meta = dict(v.split("=") for v in metadata[0].split("/"))
78+
79+
keys = level_one | level_two | level_three
80+
81+
keys.pop("year")
82+
keys.pop("month")
83+
84+
key_order = [
85+
"class",
86+
"dataset",
87+
"stream",
88+
"activity",
89+
"resolution",
90+
"expver",
91+
"experiment",
92+
"generation",
93+
"model",
94+
"realization",
95+
"type",
96+
"date",
97+
"time",
98+
"datetime",
99+
"levtype",
100+
"levelist",
101+
"step",
102+
"param",
103+
]
104+
keys = {k: keys[k] for k in key_order if k in keys}
105+
106+
level_three_qube |= Qube.from_datacube(keys).add_metadata(
107+
offset_length_meta
108+
)
109+
level_one_qube.save(str(output))
110+
111+
if decompressed.exists():
112+
print(f"Removing {decompressed}")
113+
decompressed.unlink()

0 commit comments

Comments
 (0)