forked from Bar0n624/easy_funds
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathget_funds.py
More file actions
231 lines (195 loc) · 7.16 KB
/
get_funds.py
File metadata and controls
231 lines (195 loc) · 7.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
import requests
import mysql.connector
from dateutil.relativedelta import relativedelta
from datetime import datetime
from collections import defaultdict
import json
import sys
import threading
from queue import Queue
import logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(message)s")
logger = logging.getLogger()
url = "https://portal.amfiindia.com/DownloadNAVHistoryReport_Po.aspx?frmdt=%s&todt=%s"
data_queue = Queue()
def one_month_later_or_latest(date_str):
initial_date = datetime.strptime(date_str, "%d-%b-%Y")
one_month_later = initial_date + relativedelta(months=1)
one_month_later = one_month_later - relativedelta(days=1)
today = datetime.today()
result_date = one_month_later if one_month_later <= today else today
return result_date.strftime("%d-%b-%Y")
def request_url(url, date):
url = url % (date, one_month_later_or_latest(date))
response = requests.get(url)
return response.text
def parse(response):
category, company = None, None
parsed_data = []
lines = response.strip().splitlines()
lines = [line.strip() for line in lines if line.strip() != ""]
lines = lines[1:]
i = 0
while i < len(lines):
line = lines[i]
if len(line.split(";")) == 1:
if i + 1 < len(lines) and len(lines[i + 1].split(";")) == 1:
category = line
company = lines[i + 1]
i += 2
else:
company = line
i += 1
else:
parts = line.split(";")
if len(parts) == 8:
(
scheme_code,
scheme_name,
isin_div,
isin_reinv,
nav,
repurchase,
sale,
date,
) = parts
parsed_data.append(
{
"category": category,
"company": company,
"name": scheme_name,
"value": nav,
"date": date,
}
)
i += 1
return parsed_data
def batch_insert_data(data):
with open(".passwd.txt", "r") as file:
passwd = file.read().strip()
connection = mysql.connector.connect(
host="bar0n.live", user="fund", password=passwd, database="fund"
)
cursor = connection.cursor(buffered=True)
category_map = {}
company_map = {}
fund_map = {}
batch_data = []
for line in data:
print("Processed: ", data.index(line) + 1, "of", len(data), end="\r")
category, company, name, value, date = (
line["category"],
line["company"],
line["name"],
line["value"],
line["date"],
)
name = name.replace("'", "")
date = datetime.strptime(date.strip(), "%d-%b-%Y")
date = date.strftime("%Y-%m-%d %H:%M:%S")
if name not in fund_map:
try:
cursor.execute(
f"SELECT fund_id FROM fund_name WHERE fund_name = '{name}'"
)
except mysql.connector.Error as err:
logger.error(f"Error checking fund {name}: {err}")
continue
if cursor.rowcount <= 0:
if company not in company_map:
cursor.execute(
f"SELECT company_id FROM fund_company WHERE company_name = '{company}'"
)
if cursor.rowcount <= 0:
cursor.execute(
f"INSERT INTO fund_company (company_name) VALUES ('{company}')"
)
connection.commit()
cursor.execute(
f"SELECT company_id FROM fund_company WHERE company_name = '{company}'"
)
company_id = cursor.fetchone()[0]
company_map[company] = company_id
else:
company_id = company_map[company]
if category not in category_map:
cursor.execute(
f"SELECT category_id FROM fund_category WHERE category_name = '{category}'"
)
if cursor.rowcount <= 0:
cursor.execute(
f"INSERT INTO fund_category (category_name) VALUES ('{category}')"
)
connection.commit()
cursor.execute(
f"SELECT category_id FROM fund_category WHERE category_name = '{category}'"
)
category_id = cursor.fetchone()[0]
category_map[category] = category_id
else:
category_id = category_map[category]
cursor.execute(
f"INSERT INTO fund_name (fund_name, company_id, category_id) VALUES ('{name}', '{company_id}', '{category_id}')"
)
connection.commit()
cursor.execute(
f"SELECT fund_id FROM fund_name WHERE fund_name = '{name}'"
)
fund_id = cursor.fetchone()[0]
fund_map[name] = fund_id
else:
fund_id = fund_map[name]
batch_data.append((fund_id, value, date))
try:
cursor.executemany(
f"INSERT IGNORE INTO fund_value (fund_id, price, date) VALUES (%s, %s, %s)",
batch_data,
)
print(batch_data)
connection.commit()
logger.info(f"Batch insert completed for {len(batch_data)} records.")
except mysql.connector.Error as err:
logger.error(f"Error during batch insert: {err}")
finally:
cursor.close()
connection.close()
# def process_month(month):
# try:
# response = request_url(url, month)
# parsed_data = parse(response)
# logger.info(f"Month {month} processed successfully.")
# data_queue.put(parsed_data)
# except Exception as e:
# logger.error(f"Error processing month {month}: {e}")
# def worker():
# while True:
# data = data_queue.get()
# if data is None:
# break
# batch_insert_data(data)
# data_queue.task_done()
# if __name__ == "__main__":
# months = sys.argv[1:]
# threads = []
# for month in months:
# t = threading.Thread(target=process_month, args=(month,))
# threads.append(t)
# t.start()
# for t in threads:
# t.join()
# num_workers = 4
# workers = []
# for _ in range(num_workers):
# worker_thread = threading.Thread(target=worker)
# workers.append(worker_thread)
# worker_thread.start()
# data_queue.join()
# for _ in range(num_workers):
# data_queue.put(None)
# for worker_thread in workers:
# worker_thread.join()
# logger.info("Data processing and insertion complete.")
if __name__ == "__main__":
months = sys.argv[1:]
for month in months:
batch_insert_data(parse(request_url(url, month)))