-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathrun_pipeline.py
More file actions
321 lines (255 loc) · 12.7 KB
/
run_pipeline.py
File metadata and controls
321 lines (255 loc) · 12.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
多因子策略完整流水线
整合数据获取、数据加工、因子衍生、多因子策略四个模块
"""
import argparse
import logging
from datetime import datetime
# 导入各个模块
from data_acquisition.data_fetcher import BaoStockDataFetcher
from data_acquisition.batch_processor import BatchProcessor
from data_processing.dwd_processor import DWDProcessor
from data_processing.base_factor_processor import BaseFactorProcessor
from factor_derivation.factor_generation_fixed import FactorGeneratorFixed
from factor_derivation.factor_preprocessor import FactorPreprocessor
from factor_analysis.simple_factor_analyzer import SimpleFactorAnalyzer
from multi_factor_strategy.multi_factor_strategy_fixed import MultiFactorStrategyFixed
from multi_factor_strategy.optimized_multi_factor_strategy import OptimizedMultiFactorStrategy
from database.manager_fixed import DatabaseManagerFixed
class MultiFactorPipeline:
"""多因子策略完整流水线"""
def __init__(self):
"""初始化流水线"""
self.db_manager = DatabaseManagerFixed()
self.data_fetcher = BaoStockDataFetcher()
self.batch_processor = BatchProcessor()
self.dwd_processor = DWDProcessor()
self.base_factor_processor = BaseFactorProcessor()
self.factor_generator = FactorGeneratorFixed()
self.factor_preprocessor = FactorPreprocessor()
self.single_factor_analyzer = SimpleFactorAnalyzer()
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
self.logger = logging.getLogger(__name__)
def run_data_acquisition(self, start_date: str, end_date: str):
"""运行数据获取模块"""
self.logger.info("=== 开始数据获取 ===")
try:
# 获取HS300成分股
self.logger.info("获取HS300成分股数据")
self.data_fetcher.get_hs300_stocks()
# 批量获取股票数据
self.logger.info("批量获取股票数据")
self.batch_processor.update_all_stocks(start_date, end_date)
self.logger.info("数据获取完成")
except Exception as e:
self.logger.error(f"数据获取失败: {str(e)}")
raise
def run_data_processing(self, start_date: str = '2020-06-01'):
"""运行数据加工模块"""
self.logger.info("=== 开始数据加工 ===")
try:
# 处理DWD层数据
self.logger.info("处理DWD层数据")
self.dwd_processor.process_all_tables()
# 构建基础因子表
self.logger.info("构建基础因子表")
self.base_factor_processor.create_base_factor_table(start_date)
self.logger.info("数据加工完成")
except Exception as e:
self.logger.error(f"数据加工失败: {str(e)}")
raise
def run_factor_derivation(self, start_date: str, end_date: str):
"""运行因子衍生模块"""
self.logger.info("=== 开始因子衍生 ===")
try:
# 获取基础因子数据
self.logger.info("获取基础因子数据")
df = self.factor_generator.get_base_factor_data(start_date, end_date)
# 生成各种因子
self.logger.info("生成技术因子")
df = self.factor_generator.generate_technical_factors_optimized(df)
self.logger.info("生成基本面因子")
df = self.factor_generator.generate_fundamental_factors_optimized(df)
self.logger.info("生成横截面因子")
df = self.factor_generator.generate_cross_sectional_factors_optimized(df)
# 保存因子数据
self.logger.info("保存因子数据到数据库")
self.factor_generator.save_factors_to_database_safe(df)
self.logger.info("因子衍生完成")
except Exception as e:
self.logger.error(f"因子衍生失败: {str(e)}")
raise
def run_factor_preprocessing(self, start_date: str, end_date: str,
table_name: str = 'dws_stock_factors',
output_table_name: str = None):
"""运行因子预处理模块"""
self.logger.info("=== 开始因子预处理 ===")
try:
# 运行完整的因子预处理流程
self.factor_preprocessor.run_full_preprocessing(
start_date=start_date,
end_date=end_date,
table_name=table_name,
output_table_name=output_table_name,
missing_method='forward_fill',
winsorize_method='quantile',
winsorize_limits=(0.01, 0.99),
standardize_method='zscore',
neutralize=True,
ic_analysis=True
)
self.logger.info("因子预处理完成")
except Exception as e:
self.logger.error(f"因子预处理失败: {str(e)}")
raise
def run_multi_factor_strategy(self, start_date: str, end_date: str,
strategy_type: str = 'optimized', **kwargs):
"""运行多因子策略模块"""
self.logger.info("=== 开始多因子策略 ===")
try:
if strategy_type == 'fixed':
strategy = MultiFactorStrategyFixed(
start_date=start_date,
end_date=end_date,
**kwargs
)
else: # optimized
strategy = OptimizedMultiFactorStrategy(
start_date=start_date,
end_date=end_date,
**kwargs
)
strategy.run_optimized_strategy() if strategy_type == 'optimized' else strategy.run_strategy()
self.logger.info("多因子策略完成")
except Exception as e:
self.logger.error(f"多因子策略失败: {str(e)}")
raise
def run_single_factor_analysis(self, start_date: str, end_date: str,
table_name: str = 'dws_stock_factors_preprocessed',
quantiles: int = 5, max_factors: int = None,
save_plots: bool = True, output_dir: str = "factor_analysis_plots"):
"""运行单因子分析"""
self.logger.info("=== 开始单因子分析 ===")
try:
# 运行单因子分析
results = self.single_factor_analyzer.analyze_all_factors(
start_date=start_date,
end_date=end_date,
table_name=table_name,
quantiles=quantiles,
max_factors=max_factors,
save_plots=save_plots,
output_dir=output_dir
)
# 不再生成MD报告,HTML报告已自动生成
self.logger.info("单因子分析完成")
return results
except Exception as e:
self.logger.error(f"单因子分析失败: {str(e)}")
raise
def run_full_pipeline(self, start_date: str, end_date: str,
strategy_type: str = 'optimized', **kwargs):
"""运行完整流水线"""
self.logger.info("开始运行完整多因子策略流水线")
try:
# 1. 数据获取
self.run_data_acquisition(start_date, end_date)
# 2. 数据加工
self.run_data_processing()
# 3. 因子衍生
self.run_factor_derivation(start_date, end_date)
# 4. 因子预处理
self.run_factor_preprocessing(start_date, end_date)
# 5. 多因子策略
self.run_multi_factor_strategy(start_date, end_date, strategy_type, **kwargs)
self.logger.info("完整流水线运行成功!")
except Exception as e:
self.logger.error(f"流水线运行失败: {str(e)}")
raise
finally:
self.close()
def close(self):
"""关闭所有连接"""
self.db_manager.close()
self.data_fetcher.close()
self.factor_generator.close()
self.factor_preprocessor.close()
self.single_factor_analyzer.close()
def main():
"""主函数"""
parser = argparse.ArgumentParser(description='多因子策略完整流水线')
# 基本参数
parser.add_argument('--start-date', default='2020-06-01', help='开始日期')
parser.add_argument('--end-date', default='2020-12-31', help='结束日期')
parser.add_argument('--strategy-type', choices=['fixed', 'optimized'],
default='optimized', help='策略类型')
# 策略参数
parser.add_argument('--rebalance-freq', type=int, default=10, help='调仓频率')
parser.add_argument('--top-n', type=int, default=50, help='选股数量')
parser.add_argument('--min-score', type=float, default=0.0, help='最小因子得分')
# 流水线步骤
parser.add_argument('--data-acquisition', action='store_true', help='只运行数据获取')
parser.add_argument('--data-processing', action='store_true', help='只运行数据加工')
parser.add_argument('--factor-derivation', action='store_true', help='只运行因子衍生')
parser.add_argument('--factor-preprocessing', action='store_true', help='只运行因子预处理')
parser.add_argument('--single-factor-analysis', action='store_true', help='只运行单因子分析')
parser.add_argument('--multi-factor-strategy', action='store_true', help='只运行多因子策略')
parser.add_argument('--full-pipeline', action='store_true', help='运行完整流水线')
# 单因子分析参数
parser.add_argument('--table-name', default='dws_stock_factors_preprocessed', help='因子表名')
parser.add_argument('--quantiles', type=int, default=5, help='分层数量')
parser.add_argument('--max-factors', type=int, help='最大分析因子数量')
parser.add_argument('--save-plots', action='store_true', help='保存alphalens图表')
parser.add_argument('--output-dir', default='factor_analysis_plots', help='图表输出目录')
args = parser.parse_args()
# 创建流水线
pipeline = MultiFactorPipeline()
try:
if args.full_pipeline or not any([args.data_acquisition, args.data_processing,
args.factor_derivation, args.factor_preprocessing,
args.single_factor_analysis, args.multi_factor_strategy]):
# 运行完整流水线
pipeline.run_full_pipeline(
start_date=args.start_date,
end_date=args.end_date,
strategy_type=args.strategy_type,
rebalance_freq=args.rebalance_freq,
top_n=args.top_n,
min_score=args.min_score
)
else:
# 运行指定步骤
if args.data_acquisition:
pipeline.run_data_acquisition(args.start_date, args.end_date)
if args.data_processing:
pipeline.run_data_processing()
if args.factor_derivation:
pipeline.run_factor_derivation(args.start_date, args.end_date)
if args.factor_preprocessing:
pipeline.run_factor_preprocessing(args.start_date, args.end_date)
if args.single_factor_analysis:
pipeline.run_single_factor_analysis(
args.start_date, args.end_date, args.table_name,
quantiles=args.quantiles, max_factors=args.max_factors,
save_plots=args.save_plots, output_dir=args.output_dir
)
if args.multi_factor_strategy:
pipeline.run_multi_factor_strategy(
args.start_date, args.end_date, args.strategy_type,
rebalance_freq=args.rebalance_freq,
top_n=args.top_n,
min_score=args.min_score
)
print("\n🎉 流水线运行成功!")
except Exception as e:
print(f"\n❌ 流水线运行失败: {str(e)}")
import traceback
traceback.print_exc()
if __name__ == '__main__':
main()