1+ """
2+ :file: 02_Hinet_Download_Queue.py
3+ :author: Zhu Dengda ([email protected] ) 4+ :date: 2023-04
5+
6+ 基于HinetPy下载Hinet官网的地震数据,
7+
8+ 实际手动下载会发现,在你本地网速正常的情况下,当你在Hinet官网发出下载请求后
9+ 大部分事件是在等待Hinet服务器整理数据,然后才能下载。
10+
11+ 该脚本通过使用多账号,将下载任务以队列的形式进行多进程并行下载,提高下载效率
12+
13+ """
14+
15+
16+ import os
17+ import numpy as np
18+ from HinetPy import Client
19+ from datetime import datetime
20+ import shutil
21+ import multiprocessing as mp
22+ from typing import List
23+ import pandas as pd
24+ import tqdm
25+ from copy import deepcopy
26+ import fnmatch
27+
28+
29+ def log (txt : str ):
30+ string = f"[{ datetime .today ()} ] { txt } "
31+ print (string )
32+
33+ def get_free_GB (path :str ):
34+ '''获得当前路径下磁盘的可用空间,返回GB'''
35+ total , used , free = shutil .disk_usage (path )
36+ return free / (1024 ** 3 )
37+
38+ def download_data (task_queue :mp .Queue , user :str , passwd :str , savedirlist :List [str ],
39+ minfreeGB :float , span :float , netcode :str ):
40+ '''每个进程的任务函数'''
41+ client = Client (user , passwd , retries = 5 , max_sleep_count = 60 , sleep_time_in_seconds = 3 )
42+ savedir = savedirlist .pop (0 )
43+
44+ # 切换到一个临时目录,防止各进程的中间文件互相干扰
45+ tmp_wdir = f".tmp_{ user } "
46+ if os .path .exists (tmp_wdir ):
47+ shutil .rmtree (tmp_wdir )
48+ os .makedirs (tmp_wdir , exist_ok = True )
49+ os .chdir (tmp_wdir )
50+
51+ while True :
52+ jpcode :str = task_queue .get ()
53+ if jpcode is None : # 结束信号
54+ break
55+
56+ codesavedir = os .path .join (savedir , jpcode )
57+
58+ # 根据可用空间判断是否需要更换存储目录
59+ freeGB = get_free_GB (savedir )
60+ if freeGB <= minfreeGB :
61+ log (f"{ user } : { savedir } free space has only { freeGB :.2f} GB, savedir need to be changed." )
62+ if len (savedirlist )> 0 :
63+ savedir = savedirlist .pop (0 )
64+ else :
65+ while True :
66+ try :
67+ savedir = input ("Enter a new savedir: " )
68+ os .makedirs (savedir , exist_ok = True )
69+ except :
70+ print ("wrong input, try again." )
71+ continue
72+ break
73+ os .makedirs (savedir , exist_ok = True )
74+
75+ log (f"{ user } : savedir was changed to { savedir } ." )
76+
77+
78+ try :
79+ log (f"{ user } : { jpcode } downloading..." )
80+ _ = client .get_continuous_waveform (
81+ netcode ,
82+ jpcode ,
83+ span ,
84+ outdir = jpcode , # 由一些库历史问题(HinetPy中使用了os.rename函数,其兼容性不好),这里先保存到本地,然后再移动
85+ )
86+ # 再移动到目标目录
87+ shutil .move (jpcode , codesavedir )
88+ # time.sleep(1)
89+ except Exception as e :
90+ log (f"{ user } : { jpcode } Exception: { str (e )} " )
91+ continue
92+
93+ if os .path .exists (codesavedir ):
94+ log (f"{ user } : { jpcode } download successfully, directory: { savedir } , free space: { freeGB :.2f} GB" )
95+ else :
96+ log (f"{ user } : { jpcode } download failed." )
97+
98+ os .chdir (".." )
99+ shutil .rmtree (tmp_wdir )
100+
101+
102+ def update_mission (savedirlist :List [str ], catLst :List [str ]):
103+ # 根据保存目录中已有的文件夹名,对待下载任务做筛选
104+ for INPUT in savedirlist :
105+ Lst = os .listdir (INPUT )
106+ for code in tqdm .tqdm (Lst , desc = f"{ INPUT } mission updating" ):
107+ if not os .path .isdir (os .path .join (INPUT , code )):
108+ continue
109+
110+ try :
111+ catLst .remove (code )
112+ # tqdm.tqdm.write(f"found existing code {code}, skipped.")
113+ continue
114+ except :
115+ pass
116+
117+
118+ def run (cfgs :dict , pattern :str , catalog_csv :str , savedirlist :List [str ], updatesavedirlist :List [str ], minfreeGB :float , update :bool ):
119+ # =====================================
120+ # 设置基本参数
121+ # =====================================
122+ savedirlist_bak = deepcopy (savedirlist )
123+ if update :
124+ savedirlist = updatesavedirlist
125+
126+ # 转为绝对路径
127+ savedirlist = [os .path .abspath (p ) for p in savedirlist ]
128+ # 多个账号和密码
129+ userinfoLst = []
130+ for user in cfgs ['accounts' ]:
131+ userinfoLst .append ([user ['username' ], user ['password' ]])
132+
133+ # 下载波形时长,分钟
134+ span = cfgs ['span' ]
135+ netcode = cfgs ['netcode' ]
136+ os .makedirs (savedirlist [0 ], exist_ok = True )
137+
138+ # 读取事件目录
139+ catLst = pd .read_csv (catalog_csv , usecols = ['code' ], dtype = str ).to_dict ('list' )['code' ]
140+ catLst = [s for s in catLst if fnmatch .fnmatch (s , pattern )]
141+
142+ # 去重,排序
143+ catLst = list (set (catLst ))
144+ catLst .sort ()
145+ if update :
146+ update_mission (savedirlist_bak , catLst )
147+
148+
149+ # 创建队列,并添加任务
150+ log (f"read in { len (catLst )} events." )
151+ task_queue = mp .Queue ()
152+ for orig in catLst :
153+ task_queue .put (orig ) # hinet下载开始时刻只允许精确到分钟
154+
155+ # 启动多个进程
156+ processes = []
157+ for i in range (len (userinfoLst )):
158+ user , passwd = userinfoLst [i ]
159+ p = mp .Process (target = download_data ,
160+ args = (task_queue , user , passwd , savedirlist .copy (),
161+ minfreeGB , span , netcode ))
162+ p .start ()
163+ processes .append (p )
164+
165+ # 发送结束信号
166+ for _ in processes :
167+ task_queue .put (None )
168+
169+ # 等待所有任务完成
170+ task_queue .close ()
171+ task_queue .join_thread ()
172+
173+ for p in processes :
174+ p .join ()
175+
176+
177+ log (f"ALL DONE. " )
178+
179+
180+ if __name__ == '__main__' :
181+ import argparse
182+ import yaml
183+ parser = argparse .ArgumentParser ()
184+ parser .add_argument ("configpath" )
185+ args = parser .parse_args ()
186+ configpath = args .configpath
187+
188+ with open (configpath , "r" ) as f :
189+ CFGS = yaml .safe_load (f )
190+ cfgs = CFGS ['Waveform' ]['hinetkeys' ]
191+ catalog_csv = CFGS ['catalog_csv' ]
192+ # 多个本地下载路径,防止磁盘空间不够
193+ savedirlist = CFGS ['waveform_dirs' ]
194+ updatesavedirlist = CFGS ['Waveform' ]['update_waveform_dirs' ]
195+ # 磁盘最小可容许存储空间GB,此时切换下载路径
196+ minfreeGB = CFGS ['Waveform' ]['minfreeGB' ]
197+ # 是否以更新模式下载
198+ update = CFGS ['Waveform' ]['update' ]
199+ # 通配符
200+ pattern = CFGS ['Waveform' ]['pattern' ]
201+
202+ run (cfgs , pattern , catalog_csv , savedirlist , updatesavedirlist , minfreeGB , update )
0 commit comments