forked from ZhuLinsen/daily_stock_analysis
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
882 lines (721 loc) · 31.4 KB
/
main.py
File metadata and controls
882 lines (721 loc) · 31.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
# -*- coding: utf-8 -*-
"""
===================================
A股自选股智能分析系统 - 主调度程序
===================================
职责:
1. 协调各模块完成股票分析流程
2. 实现低并发的线程池调度
3. 全局异常处理,确保单股失败不影响整体
4. 提供命令行入口
使用方式:
python main.py # 正常运行
python main.py --debug # 调试模式
python main.py --dry-run # 仅获取数据不分析
交易理念(已融入分析):
- 严进策略:不追高,乖离率 > 5% 不买入
- 趋势交易:只做 MA5>MA10>MA20 多头排列
- 效率优先:关注筹码集中度好的股票
- 买点偏好:缩量回踩 MA5/MA10 支撑
"""
import os
# 代理配置 - 仅在本地环境使用,GitHub Actions 不需要
if os.getenv("GITHUB_ACTIONS") != "true":
# 本地开发环境,如需代理请取消注释或修改端口
# os.environ["http_proxy"] = "http://127.0.0.1:10809"
# os.environ["https_proxy"] = "http://127.0.0.1:10809"
pass
import argparse
import logging
import sys
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, date, timezone, timedelta
from logging.handlers import RotatingFileHandler
from pathlib import Path
from typing import List, Dict, Any, Optional, Tuple
from feishu_doc import FeishuDocManager
from config import get_config, Config
from storage import get_db, DatabaseManager
from data_provider import DataFetcherManager
from data_provider.akshare_fetcher import AkshareFetcher, RealtimeQuote, ChipDistribution
from analyzer import GeminiAnalyzer, AnalysisResult, STOCK_NAME_MAP
from notification import NotificationService, send_daily_report
from search_service import SearchService, SearchResponse
from stock_analyzer import StockTrendAnalyzer, TrendAnalysisResult
from market_analyzer import MarketAnalyzer
# 配置日志格式
LOG_FORMAT = '%(asctime)s | %(levelname)-8s | %(name)-20s | %(message)s'
LOG_DATE_FORMAT = '%Y-%m-%d %H:%M:%S'
def setup_logging(debug: bool = False, log_dir: str = "./logs") -> None:
"""
配置日志系统(同时输出到控制台和文件)
Args:
debug: 是否启用调试模式
log_dir: 日志文件目录
"""
level = logging.DEBUG if debug else logging.INFO
# 创建日志目录
log_path = Path(log_dir)
log_path.mkdir(parents=True, exist_ok=True)
# 日志文件路径(按日期分文件)
today_str = datetime.now().strftime('%Y%m%d')
log_file = log_path / f"stock_analysis_{today_str}.log"
debug_log_file = log_path / f"stock_analysis_debug_{today_str}.log"
# 创建根 logger
root_logger = logging.getLogger()
root_logger.setLevel(logging.DEBUG) # 根 logger 设为 DEBUG,由 handler 控制输出级别
# Handler 1: 控制台输出
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(level)
console_handler.setFormatter(logging.Formatter(LOG_FORMAT, LOG_DATE_FORMAT))
root_logger.addHandler(console_handler)
# Handler 2: 常规日志文件(INFO 级别,10MB 轮转)
file_handler = RotatingFileHandler(
log_file,
maxBytes=10 * 1024 * 1024, # 10MB
backupCount=5,
encoding='utf-8'
)
file_handler.setLevel(logging.INFO)
file_handler.setFormatter(logging.Formatter(LOG_FORMAT, LOG_DATE_FORMAT))
root_logger.addHandler(file_handler)
# Handler 3: 调试日志文件(DEBUG 级别,包含所有详细信息)
debug_handler = RotatingFileHandler(
debug_log_file,
maxBytes=50 * 1024 * 1024, # 50MB
backupCount=3,
encoding='utf-8'
)
debug_handler.setLevel(logging.DEBUG)
debug_handler.setFormatter(logging.Formatter(LOG_FORMAT, LOG_DATE_FORMAT))
root_logger.addHandler(debug_handler)
# 降低第三方库的日志级别
logging.getLogger('urllib3').setLevel(logging.WARNING)
logging.getLogger('sqlalchemy').setLevel(logging.WARNING)
logging.getLogger('google').setLevel(logging.WARNING)
logging.getLogger('httpx').setLevel(logging.WARNING)
logging.info(f"日志系统初始化完成,日志目录: {log_path.absolute()}")
logging.info(f"常规日志: {log_file}")
logging.info(f"调试日志: {debug_log_file}")
logger = logging.getLogger(__name__)
class StockAnalysisPipeline:
"""
股票分析主流程调度器
职责:
1. 管理整个分析流程
2. 协调数据获取、存储、搜索、分析、通知等模块
3. 实现并发控制和异常处理
"""
def __init__(
self,
config: Optional[Config] = None,
max_workers: Optional[int] = None
):
"""
初始化调度器
Args:
config: 配置对象(可选,默认使用全局配置)
max_workers: 最大并发线程数(可选,默认从配置读取)
"""
self.config = config or get_config()
self.max_workers = max_workers or self.config.max_workers
# 初始化各模块
self.db = get_db()
self.fetcher_manager = DataFetcherManager()
self.akshare_fetcher = AkshareFetcher() # 用于获取增强数据(量比、筹码等)
self.trend_analyzer = StockTrendAnalyzer() # 趋势分析器
self.analyzer = GeminiAnalyzer()
self.notifier = NotificationService()
# 初始化搜索服务
self.search_service = SearchService(
tavily_keys=self.config.tavily_api_keys,
serpapi_keys=self.config.serpapi_keys,
)
logger.info(f"调度器初始化完成,最大并发数: {self.max_workers}")
logger.info("已启用趋势分析器 (MA5>MA10>MA20 多头判断)")
if self.search_service.is_available:
logger.info("搜索服务已启用 (Tavily/SerpAPI)")
else:
logger.warning("搜索服务未启用(未配置 API Key)")
def fetch_and_save_stock_data(
self,
code: str,
force_refresh: bool = False
) -> Tuple[bool, Optional[str]]:
"""
获取并保存单只股票数据
断点续传逻辑:
1. 检查数据库是否已有今日数据
2. 如果有且不强制刷新,则跳过网络请求
3. 否则从数据源获取并保存
Args:
code: 股票代码
force_refresh: 是否强制刷新(忽略本地缓存)
Returns:
Tuple[是否成功, 错误信息]
"""
try:
today = date.today()
# 断点续传检查:如果今日数据已存在,跳过
if not force_refresh and self.db.has_today_data(code, today):
logger.info(f"[{code}] 今日数据已存在,跳过获取(断点续传)")
return True, None
# 从数据源获取数据
logger.info(f"[{code}] 开始从数据源获取数据...")
df, source_name = self.fetcher_manager.get_daily_data(code, days=30)
if df is None or df.empty:
return False, "获取数据为空"
# 保存到数据库
saved_count = self.db.save_daily_data(df, code, source_name)
logger.info(f"[{code}] 数据保存成功(来源: {source_name},新增 {saved_count} 条)")
return True, None
except Exception as e:
error_msg = f"获取/保存数据失败: {str(e)}"
logger.error(f"[{code}] {error_msg}")
return False, error_msg
def analyze_stock(self, code: str) -> Optional[AnalysisResult]:
"""
分析单只股票(增强版:含量比、换手率、筹码分析、多维度情报)
流程:
1. 获取实时行情(量比、换手率)
2. 获取筹码分布
3. 进行趋势分析(基于交易理念)
4. 多维度情报搜索(最新消息+风险排查+业绩预期)
5. 从数据库获取分析上下文
6. 调用 AI 进行综合分析
Args:
code: 股票代码
Returns:
AnalysisResult 或 None(如果分析失败)
"""
try:
# 获取股票名称(优先从实时行情获取真实名称)
stock_name = STOCK_NAME_MAP.get(code, '')
# Step 1: 获取实时行情(量比、换手率等)
realtime_quote: Optional[RealtimeQuote] = None
try:
realtime_quote = self.akshare_fetcher.get_realtime_quote(code)
if realtime_quote:
# 使用实时行情返回的真实股票名称
if realtime_quote.name:
stock_name = realtime_quote.name
logger.info(f"[{code}] {stock_name} 实时行情: 价格={realtime_quote.price}, "
f"量比={realtime_quote.volume_ratio}, 换手率={realtime_quote.turnover_rate}%")
except Exception as e:
logger.warning(f"[{code}] 获取实时行情失败: {e}")
# 如果还是没有名称,使用代码作为名称
if not stock_name:
stock_name = f'股票{code}'
# Step 2: 获取筹码分布
chip_data: Optional[ChipDistribution] = None
try:
chip_data = self.akshare_fetcher.get_chip_distribution(code)
if chip_data:
logger.info(f"[{code}] 筹码分布: 获利比例={chip_data.profit_ratio:.1%}, "
f"90%集中度={chip_data.concentration_90:.2%}")
except Exception as e:
logger.warning(f"[{code}] 获取筹码分布失败: {e}")
# Step 3: 趋势分析(基于交易理念)
trend_result: Optional[TrendAnalysisResult] = None
try:
# 获取历史数据进行趋势分析
context = self.db.get_analysis_context(code)
if context and 'raw_data' in context:
import pandas as pd
raw_data = context['raw_data']
if isinstance(raw_data, list) and len(raw_data) > 0:
df = pd.DataFrame(raw_data)
trend_result = self.trend_analyzer.analyze(df, code)
logger.info(f"[{code}] 趋势分析: {trend_result.trend_status.value}, "
f"买入信号={trend_result.buy_signal.value}, 评分={trend_result.signal_score}")
except Exception as e:
logger.warning(f"[{code}] 趋势分析失败: {e}")
# Step 4: 多维度情报搜索(最新消息+风险排查+业绩预期)
news_context = None
if self.search_service.is_available:
logger.info(f"[{code}] 开始多维度情报搜索...")
# 使用多维度搜索(最多3次搜索)
intel_results = self.search_service.search_comprehensive_intel(
stock_code=code,
stock_name=stock_name,
max_searches=3
)
# 格式化情报报告
if intel_results:
news_context = self.search_service.format_intel_report(intel_results, stock_name)
total_results = sum(
len(r.results) for r in intel_results.values() if r.success
)
logger.info(f"[{code}] 情报搜索完成: 共 {total_results} 条结果")
logger.debug(f"[{code}] 情报搜索结果:\n{news_context}")
else:
logger.info(f"[{code}] 搜索服务不可用,跳过情报搜索")
# Step 5: 获取分析上下文(技术面数据)
context = self.db.get_analysis_context(code)
if context is None:
logger.warning(f"[{code}] 无法获取分析上下文,跳过分析")
return None
# Step 6: 增强上下文数据(添加实时行情、筹码、趋势分析结果、股票名称)
enhanced_context = self._enhance_context(
context,
realtime_quote,
chip_data,
trend_result,
stock_name # 传入股票名称
)
# Step 7: 调用 AI 分析(传入增强的上下文和新闻)
result = self.analyzer.analyze(enhanced_context, news_context=news_context)
return result
except Exception as e:
logger.error(f"[{code}] 分析失败: {e}")
logger.exception(f"[{code}] 详细错误信息:")
return None
def _enhance_context(
self,
context: Dict[str, Any],
realtime_quote: Optional[RealtimeQuote],
chip_data: Optional[ChipDistribution],
trend_result: Optional[TrendAnalysisResult],
stock_name: str = ""
) -> Dict[str, Any]:
"""
增强分析上下文
将实时行情、筹码分布、趋势分析结果、股票名称添加到上下文中
Args:
context: 原始上下文
realtime_quote: 实时行情数据
chip_data: 筹码分布数据
trend_result: 趋势分析结果
stock_name: 股票名称
Returns:
增强后的上下文
"""
enhanced = context.copy()
# 添加股票名称
if stock_name:
enhanced['stock_name'] = stock_name
elif realtime_quote and realtime_quote.name:
enhanced['stock_name'] = realtime_quote.name
# 添加实时行情
if realtime_quote:
enhanced['realtime'] = {
'name': realtime_quote.name, # 股票名称
'price': realtime_quote.price,
'volume_ratio': realtime_quote.volume_ratio,
'volume_ratio_desc': self._describe_volume_ratio(realtime_quote.volume_ratio),
'turnover_rate': realtime_quote.turnover_rate,
'pe_ratio': realtime_quote.pe_ratio,
'pb_ratio': realtime_quote.pb_ratio,
'total_mv': realtime_quote.total_mv,
'circ_mv': realtime_quote.circ_mv,
'change_60d': realtime_quote.change_60d,
}
# 添加筹码分布
if chip_data:
current_price = realtime_quote.price if realtime_quote else 0
enhanced['chip'] = {
'profit_ratio': chip_data.profit_ratio,
'avg_cost': chip_data.avg_cost,
'concentration_90': chip_data.concentration_90,
'concentration_70': chip_data.concentration_70,
'chip_status': chip_data.get_chip_status(current_price),
}
# 添加趋势分析结果
if trend_result:
enhanced['trend_analysis'] = {
'trend_status': trend_result.trend_status.value,
'ma_alignment': trend_result.ma_alignment,
'trend_strength': trend_result.trend_strength,
'bias_ma5': trend_result.bias_ma5,
'bias_ma10': trend_result.bias_ma10,
'volume_status': trend_result.volume_status.value,
'volume_trend': trend_result.volume_trend,
'buy_signal': trend_result.buy_signal.value,
'signal_score': trend_result.signal_score,
'signal_reasons': trend_result.signal_reasons,
'risk_factors': trend_result.risk_factors,
}
return enhanced
def _describe_volume_ratio(self, volume_ratio: float) -> str:
"""
量比描述
量比 = 当前成交量 / 过去5日平均成交量
"""
if volume_ratio < 0.5:
return "极度萎缩"
elif volume_ratio < 0.8:
return "明显萎缩"
elif volume_ratio < 1.2:
return "正常"
elif volume_ratio < 2.0:
return "温和放量"
elif volume_ratio < 3.0:
return "明显放量"
else:
return "巨量"
def process_single_stock(
self,
code: str,
skip_analysis: bool = False
) -> Optional[AnalysisResult]:
"""
处理单只股票的完整流程
包括:
1. 获取数据
2. 保存数据
3. AI 分析
此方法会被线程池调用,需要处理好异常
Args:
code: 股票代码
skip_analysis: 是否跳过 AI 分析
Returns:
AnalysisResult 或 None
"""
logger.info(f"========== 开始处理 {code} ==========")
try:
# Step 1: 获取并保存数据
success, error = self.fetch_and_save_stock_data(code)
if not success:
logger.warning(f"[{code}] 数据获取失败: {error}")
# 即使获取失败,也尝试用已有数据分析
# Step 2: AI 分析
if skip_analysis:
logger.info(f"[{code}] 跳过 AI 分析(dry-run 模式)")
return None
result = self.analyze_stock(code)
if result:
logger.info(
f"[{code}] 分析完成: {result.operation_advice}, "
f"评分 {result.sentiment_score}"
)
return result
except Exception as e:
# 捕获所有异常,确保单股失败不影响整体
logger.exception(f"[{code}] 处理过程发生未知异常: {e}")
return None
def run(
self,
stock_codes: Optional[List[str]] = None,
dry_run: bool = False,
send_notification: bool = True
) -> List[AnalysisResult]:
"""
运行完整的分析流程
流程:
1. 获取待分析的股票列表
2. 使用线程池并发处理
3. 收集分析结果
4. 发送通知
Args:
stock_codes: 股票代码列表(可选,默认使用配置中的自选股)
dry_run: 是否仅获取数据不分析
send_notification: 是否发送推送通知
Returns:
分析结果列表
"""
start_time = time.time()
# 使用配置中的股票列表
if stock_codes is None:
stock_codes = self.config.stock_list
if not stock_codes:
logger.error("未配置自选股列表,请在 .env 文件中设置 STOCK_LIST")
return []
logger.info(f"===== 开始分析 {len(stock_codes)} 只股票 =====")
logger.info(f"股票列表: {', '.join(stock_codes)}")
logger.info(f"并发数: {self.max_workers}, 模式: {'仅获取数据' if dry_run else '完整分析'}")
results: List[AnalysisResult] = []
# 使用线程池并发处理
# 注意:max_workers 设置较低(默认3)以避免触发反爬
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# 提交任务
future_to_code = {
executor.submit(
self.process_single_stock,
code,
skip_analysis=dry_run
): code
for code in stock_codes
}
# 收集结果
for future in as_completed(future_to_code):
code = future_to_code[future]
try:
result = future.result()
if result:
results.append(result)
except Exception as e:
logger.error(f"[{code}] 任务执行失败: {e}")
# 统计
elapsed_time = time.time() - start_time
# dry-run 模式下,数据获取成功即视为成功
if dry_run:
# 检查哪些股票的数据今天已存在
success_count = sum(1 for code in stock_codes if self.db.has_today_data(code))
fail_count = len(stock_codes) - success_count
else:
success_count = len(results)
fail_count = len(stock_codes) - success_count
logger.info(f"===== 分析完成 =====")
logger.info(f"成功: {success_count}, 失败: {fail_count}, 耗时: {elapsed_time:.2f} 秒")
# 发送通知
if results and send_notification and not dry_run:
self._send_notifications(results)
return results
def _send_notifications(self, results: List[AnalysisResult]) -> None:
"""
发送分析结果通知
生成决策仪表盘格式的报告
Args:
results: 分析结果列表
"""
try:
logger.info("生成决策仪表盘日报...")
# 生成决策仪表盘格式的详细日报
report = self.notifier.generate_dashboard_report(results)
# 保存到本地
filepath = self.notifier.save_report_to_file(report)
logger.info(f"决策仪表盘日报已保存: {filepath}")
# 推送通知
if self.notifier.is_available():
# 生成精简版决策仪表盘用于推送
dashboard_content = self.notifier.generate_wechat_dashboard(results)
logger.info(f"决策仪表盘长度: {len(dashboard_content)} 字符")
logger.debug(f"推送内容:\n{dashboard_content}")
success = self.notifier.send(dashboard_content)
if success:
logger.info("决策仪表盘推送成功")
else:
logger.warning("决策仪表盘推送失败")
else:
logger.info("通知渠道未配置,跳过推送")
except Exception as e:
logger.error(f"发送通知失败: {e}")
def parse_arguments() -> argparse.Namespace:
"""解析命令行参数"""
parser = argparse.ArgumentParser(
description='A股自选股智能分析系统',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog='''
示例:
python main.py # 正常运行
python main.py --debug # 调试模式
python main.py --dry-run # 仅获取数据,不进行 AI 分析
python main.py --stocks 600519,000001 # 指定分析特定股票
python main.py --no-notify # 不发送推送通知
python main.py --schedule # 启用定时任务模式
python main.py --market-review # 仅运行大盘复盘
'''
)
parser.add_argument(
'--debug',
action='store_true',
help='启用调试模式,输出详细日志'
)
parser.add_argument(
'--dry-run',
action='store_true',
help='仅获取数据,不进行 AI 分析'
)
parser.add_argument(
'--stocks',
type=str,
help='指定要分析的股票代码,逗号分隔(覆盖配置文件)'
)
parser.add_argument(
'--no-notify',
action='store_true',
help='不发送推送通知'
)
parser.add_argument(
'--workers',
type=int,
default=None,
help='并发线程数(默认使用配置值)'
)
parser.add_argument(
'--schedule',
action='store_true',
help='启用定时任务模式,每日定时执行'
)
parser.add_argument(
'--market-review',
action='store_true',
help='仅运行大盘复盘分析'
)
parser.add_argument(
'--no-market-review',
action='store_true',
help='跳过大盘复盘分析'
)
return parser.parse_args()
def run_market_review(notifier: NotificationService, analyzer=None, search_service=None) -> Optional[str]:
"""
执行大盘复盘分析
Args:
notifier: 通知服务
analyzer: AI分析器(可选)
search_service: 搜索服务(可选)
Returns:
复盘报告文本
"""
logger.info("开始执行大盘复盘分析...")
try:
market_analyzer = MarketAnalyzer(
search_service=search_service,
analyzer=analyzer
)
# 执行复盘
review_report = market_analyzer.run_daily_review()
if review_report:
# 推送通知
if notifier.is_available():
# 添加标题
report_content = f"🎯 大盘复盘\n\n{review_report}"
success = notifier.send(report_content)
if success:
logger.info("大盘复盘推送成功")
else:
logger.warning("大盘复盘推送失败")
return review_report
except Exception as e:
logger.error(f"大盘复盘分析失败: {e}")
return None
def run_full_analysis(
config: Config,
args: argparse.Namespace,
stock_codes: Optional[List[str]] = None
):
"""
执行完整的分析流程(个股 + 大盘复盘)
这是定时任务调用的主函数
"""
try:
# 创建调度器
pipeline = StockAnalysisPipeline(
config=config,
max_workers=args.workers
)
# 1. 运行个股分析
results = pipeline.run(
stock_codes=stock_codes,
dry_run=args.dry_run,
send_notification=not args.no_notify
)
# 2. 运行大盘复盘(如果启用且不是仅个股模式)
market_report = ""
if config.market_review_enabled and not args.no_market_review:
# 只调用一次,并获取结果
review_result = run_market_review(
notifier=pipeline.notifier,
analyzer=pipeline.analyzer,
search_service=pipeline.search_service
)
# 如果有结果,赋值给 market_report 用于后续飞书文档生成
if review_result:
market_report = review_result
# 输出摘要
if results:
logger.info("\n===== 分析结果摘要 =====")
for r in sorted(results, key=lambda x: x.sentiment_score, reverse=True):
emoji = r.get_emoji()
logger.info(
f"{emoji} {r.name}({r.code}): {r.operation_advice} | "
f"评分 {r.sentiment_score} | {r.trend_prediction}"
)
logger.info("\n任务执行完成")
# === 新增:生成飞书云文档 ===
try:
feishu_doc = FeishuDocManager()
if feishu_doc.is_configured() and (results or market_report):
logger.info("正在创建飞书云文档...")
# 1. 准备标题 "01-01 13:01大盘复盘"
tz_cn = timezone(timedelta(hours=8))
now = datetime.now(tz_cn)
doc_title = f"{now.strftime('%Y-%m-%d %H:%M')} 大盘复盘"
# 2. 准备内容 (拼接个股分析和大盘复盘)
full_content = ""
# 添加大盘复盘内容(如果有)
if market_report:
full_content += f"# 📈 大盘复盘\n\n{market_report}\n\n---\n\n"
# 添加个股决策仪表盘(使用 NotificationService 生成)
if results:
dashboard_content = pipeline.notifier.generate_dashboard_report(results)
full_content += f"# 🚀 个股决策仪表盘\n\n{dashboard_content}"
# 3. 创建文档
doc_url = feishu_doc.create_daily_doc(doc_title, full_content)
if doc_url:
logger.info(f"飞书云文档创建成功: {doc_url}")
# 可选:将文档链接也推送到群里
pipeline.notifier.send(f"[{now.strftime('%Y-%m-%d %H:%M')}] 复盘文档创建成功: {doc_url}")
except Exception as e:
logger.error(f"飞书文档生成失败: {e}")
except Exception as e:
logger.exception(f"分析流程执行失败: {e}")
def main() -> int:
"""
主入口函数
Returns:
退出码(0 表示成功)
"""
# 解析命令行参数
args = parse_arguments()
# 加载配置(在设置日志前加载,以获取日志目录)
config = get_config()
# 配置日志(输出到控制台和文件)
setup_logging(debug=args.debug, log_dir=config.log_dir)
logger.info("=" * 60)
logger.info("A股自选股智能分析系统 启动")
logger.info(f"运行时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
logger.info("=" * 60)
# 验证配置
warnings = config.validate()
for warning in warnings:
logger.warning(warning)
# 解析股票列表
stock_codes = None
if args.stocks:
stock_codes = [code.strip() for code in args.stocks.split(',') if code.strip()]
logger.info(f"使用命令行指定的股票列表: {stock_codes}")
try:
# 模式1: 仅大盘复盘
if args.market_review:
logger.info("模式: 仅大盘复盘")
notifier = NotificationService()
# 初始化搜索服务和分析器(如果有配置)
search_service = None
analyzer = None
if config.tavily_api_keys or config.serpapi_keys:
search_service = SearchService(
tavily_keys=config.tavily_api_keys,
serpapi_keys=config.serpapi_keys
)
if config.gemini_api_key:
analyzer = GeminiAnalyzer(api_key=config.gemini_api_key)
run_market_review(notifier, analyzer, search_service)
return 0
# 模式2: 定时任务模式
if args.schedule or config.schedule_enabled:
logger.info("模式: 定时任务")
logger.info(f"每日执行时间: {config.schedule_time}")
from scheduler import run_with_schedule
def scheduled_task():
run_full_analysis(config, args, stock_codes)
run_with_schedule(
task=scheduled_task,
schedule_time=config.schedule_time,
run_immediately=True # 启动时先执行一次
)
return 0
# 模式3: 正常单次运行
run_full_analysis(config, args, stock_codes)
logger.info("\n程序执行完成")
return 0
except KeyboardInterrupt:
logger.info("\n用户中断,程序退出")
return 130
except Exception as e:
logger.exception(f"程序执行失败: {e}")
return 1
if __name__ == "__main__":
sys.exit(main())