-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathimport_csv_to_db.py
More file actions
243 lines (209 loc) · 9.25 KB
/
import_csv_to_db.py
File metadata and controls
243 lines (209 loc) · 9.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
将A08.csv文件导入到MySQL数据库中
"""
import os
import pandas as pd
import pymysql
import logging
import numpy as np
import sys
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# 获取当前脚本所在目录的绝对路径
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
# 数据库连接配置
DB_CONFIG = {
'host': 'localhost',
'port': 3306,
'user': 'root',
'passwd': 'root',
'db': 'medical',
'charset': 'utf8mb4'
}
def get_conn():
"""获取数据库连接"""
try:
conn = pymysql.connect(**DB_CONFIG)
logger.info("数据库连接成功")
return conn
except Exception as e:
logger.error(f"数据库连接失败: {str(e)}")
raise
def create_table():
"""创建表结构"""
try:
conn = get_conn()
cursor = conn.cursor()
# 删除已存在的表
drop_table_sql = """
DROP TABLE IF EXISTS a08_data;
"""
cursor.execute(drop_table_sql)
# 创建新表
create_table_sql = """
CREATE TABLE a08_data (
id INT AUTO_INCREMENT PRIMARY KEY,
patient_id VARCHAR(50),
result INT,
month_total_amount FLOAT COMMENT '月统筹金额',
month_medicine_amount FLOAT COMMENT '月药品金额',
approval_amount FLOAT COMMENT '本次审批金额',
month_visit_count INT COMMENT '月就诊次数',
above_standard_amount FLOAT COMMENT '起付标准以上自负比例金额',
non_account_payment FLOAT COMMENT '非账户支付金额',
personal_account_amount FLOAT COMMENT '个人账户金额',
available_reimbursement FLOAT COMMENT '可用账户报销金额',
overall_payment FLOAT COMMENT '统筹支付金额',
sequence_number INT COMMENT '顺序号',
medicine_cost FLOAT COMMENT '药品费发生金额',
create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
"""
cursor.execute(create_table_sql)
logger.info("表结构创建成功")
conn.commit()
cursor.close()
conn.close()
except Exception as e:
logger.error(f"创建表结构失败: {str(e)}")
raise
def import_data():
"""导入数据"""
try:
# 读取CSV文件
csv_path = os.path.join(BASE_DIR, 'data/A08.csv')
if not os.path.exists(csv_path):
logger.error(f"CSV文件不存在: {csv_path}")
return
logger.info(f"开始读取CSV文件: {csv_path}")
df = pd.read_csv(csv_path)
logger.info(f"CSV文件读取成功,共{len(df)}行数据")
# 获取标签列
label_col = 'RES'
if label_col not in df.columns:
logger.error(f"文件中不存在标签列 {label_col}")
return
# 数据库连接
conn = get_conn()
cursor = conn.cursor()
# 清空表数据
cursor.execute("TRUNCATE TABLE a08_data")
# 插入数据
inserted_count = 0
logger.info("开始导入数据...")
for index, row in df.iterrows():
try:
# 将NaN值替换为None,以便MySQL将其存储为NULL
def replace_nan(value):
if isinstance(value, (float, int, np.int64, np.float64)):
if np.isnan(value):
return None
return value
# 构建插入语句
insert_sql = """
INSERT INTO a08_data (
patient_id,
result,
month_total_amount,
month_medicine_amount,
approval_amount,
month_visit_count,
above_standard_amount,
non_account_payment,
personal_account_amount,
available_reimbursement,
overall_payment,
sequence_number,
medicine_cost
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
# 提取对应列的值
k_col = [col for col in df.columns if col.startswith('K')][0] if any(col.startswith('K') for col in df.columns) else None
l_col = [col for col in df.columns if col.startswith('L')][0] if any(col.startswith('L') for col in df.columns) else None
o_col = [col for col in df.columns if col.startswith('O')][0] if any(col.startswith('O') for col in df.columns) else None
u_col = [col for col in df.columns if col.startswith('U')][0] if any(col.startswith('U') for col in df.columns) else None
ag_col = [col for col in df.columns if col.startswith('AG')][0] if any(col.startswith('AG') for col in df.columns) else None
ac_col = [col for col in df.columns if col.startswith('AC')][0] if any(col.startswith('AC') for col in df.columns) else None
am_col = [col for col in df.columns if col.startswith('AM')][0] if any(col.startswith('AM') for col in df.columns) else None
ap_col = [col for col in df.columns if col.startswith('AP')][0] if any(col.startswith('AP') for col in df.columns) else None
bf_col = [col for col in df.columns if col.startswith('BF')][0] if any(col.startswith('BF') for col in df.columns) else None
v_col = [col for col in df.columns if col.startswith('V')][0] if any(col.startswith('V') for col in df.columns) else None
bk_col = [col for col in df.columns if col.startswith('BK')][0] if any(col.startswith('BK') for col in df.columns) else None
# 获取值,如果列不存在则使用None
k_val = replace_nan(row[k_col]) if k_col else None
l_val = replace_nan(row[l_col]) if l_col else None
o_val = replace_nan(row[o_col]) if o_col else None
u_val = replace_nan(row[u_col]) if u_col else None
ag_val = replace_nan(row[ag_col]) if ag_col else None
ac_val = replace_nan(row[ac_col]) if ac_col else None
am_val = replace_nan(row[am_col]) if am_col else None
ap_val = replace_nan(row[ap_col]) if ap_col else None
bf_val = replace_nan(row[bf_col]) if bf_col else None
v_val = replace_nan(row[v_col]) if v_col else None
bk_val = replace_nan(row[bk_col]) if bk_col else None
# 插入数据
cursor.execute(insert_sql, (
f"P{index}", # patient_id
int(row[label_col]), # result
k_val, # month_total_amount
l_val, # month_medicine_amount
o_val, # approval_amount
u_val, # month_visit_count
ag_val, # above_standard_amount
ac_val, # non_account_payment
am_val, # personal_account_amount
ap_val, # available_reimbursement
bf_val, # overall_payment
v_val, # sequence_number
bk_val, # medicine_cost
))
inserted_count += 1
# 每1000条提交一次
if inserted_count % 1000 == 0:
conn.commit()
logger.info(f"已导入 {inserted_count} 条数据")
except Exception as e:
logger.error(f"导入第 {index} 行数据失败: {str(e)}")
continue
# 提交剩余的数据
conn.commit()
logger.info(f"数据导入完成,共导入 {inserted_count} 条数据")
# 创建索引
logger.info("开始创建索引...")
indexes = [
"CREATE INDEX idx_result ON a08_data(result)",
"CREATE INDEX idx_month_visit_count ON a08_data(month_visit_count)",
"CREATE INDEX idx_month_total_amount ON a08_data(month_total_amount)"
]
for index_sql in indexes:
try:
cursor.execute(index_sql)
logger.info(f"索引创建成功: {index_sql}")
except Exception as e:
logger.error(f"创建索引失败: {str(e)}")
conn.commit()
cursor.close()
conn.close()
except Exception as e:
logger.error(f"数据导入过程出错: {str(e)}")
raise
def main():
"""主函数"""
try:
# 检查数据库连接
get_conn()
# 创建表结构
create_table()
# 导入数据
import_data()
logger.info("数据导入完成!")
return True
except Exception as e:
logger.error(f"主程序执行失败: {str(e)}")
return False
if __name__ == "__main__":
success = main()
sys.exit(0 if success else 1)