forked from tingxifa/okx-grid-bot
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathhelpers.py
More file actions
148 lines (128 loc) · 5.19 KB
/
helpers.py
File metadata and controls
148 lines (128 loc) · 5.19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
import logging
import requests
from tenacity import retry, stop_after_attempt, wait_exponential
from config import PUSHPLUS_TOKEN
import time
import psutil
import os
from logging.handlers import TimedRotatingFileHandler
def format_trade_message(side, symbol, price, amount, total, grid_size, retry_count=None):
"""格式化交易消息为美观的文本格式
Args:
side (str): 交易方向 ('buy' 或 'sell')
symbol (str): 交易对
price (float): 交易价格
amount (float): 交易数量
total (float): 交易总额
grid_size (float): 网格大小
retry_count (tuple, optional): 重试次数,格式为 (当前次数, 最大次数)
Returns:
str: 格式化后的消息文本
"""
# 使用emoji增加可读性
direction_emoji = "🟢" if side == 'buy' else "🔴"
direction_text = "买入" if side == 'buy' else "卖出"
# 构建消息主体
message = f"""
{direction_emoji} {direction_text} {symbol}
━━━━━━━━━━━━━━━━━━━━
💰 价格:{price:.2f} USDT
📊 数量:{amount:.4f} OKB
💵 金额:{total:.2f} USDT
📈 网格:{grid_size}%
"""
# 如果有重试信息,添加重试次数
if retry_count:
current, max_retries = retry_count
message += f"🔄 尝试:{current}/{max_retries}次\n"
# 添加时间戳
message += f"⏰ 时间:{time.strftime('%Y-%m-%d %H:%M:%S')}"
return message
def send_pushplus_message(content, title="交易信号通知"):
if not PUSHPLUS_TOKEN:
logging.error("未配置PUSHPLUS_TOKEN,无法发送通知")
return
url = "https://www.pushplus.plus/send"
data = {
"token": PUSHPLUS_TOKEN,
"title": title,
"content": content,
"template": "txt" # 使用文本模板
}
try:
logging.info(f"正在发送推送通知: {title}")
response = requests.post(url, data=data)
response_json = response.json()
if response.status_code == 200 and response_json.get('code') == 200:
logging.info(f"消息推送成功: {content}")
else:
logging.error(f"消息推送失败: 状态码={response.status_code}, 响应={response_json}")
except Exception as e:
logging.error(f"消息推送异常: {str(e)}", exc_info=True)
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
async def safe_fetch(method, *args, **kwargs):
try:
return await method(*args, **kwargs)
except Exception as e:
logging.error(f"请求失败: {str(e)}")
raise
def debug_watcher():
"""资源监控装饰器"""
def decorator(func):
async def wrapper(*args, **kwargs):
start = time.time()
mem_before = psutil.virtual_memory().used
logging.debug(f"[DEBUG] 开始执行 {func.__name__}")
try:
result = await func(*args, **kwargs)
return result
finally:
cost = time.time() - start
mem_used = psutil.virtual_memory().used - mem_before
logging.debug(f"[DEBUG] {func.__name__} 执行完成 | 耗时: {cost:.3f}s | 内存变化: {mem_used/1024/1024:.2f}MB")
return wrapper
return decorator
class LogConfig:
SINGLE_LOG = True # 强制单文件模式
BACKUP_DAYS = 2 # 保留2天日志
LOG_DIR = os.path.dirname(__file__) # 与main.py相同目录
LOG_LEVEL = logging.INFO
@staticmethod
def setup_logger():
logger = logging.getLogger()
logger.setLevel(LogConfig.LOG_LEVEL)
# 清理所有现有处理器
for handler in logger.handlers[:]:
logger.removeHandler(handler)
# 文件处理器
file_handler = TimedRotatingFileHandler(
os.path.join(LogConfig.LOG_DIR, 'trading_system.log'),
when='midnight',
interval=1,
backupCount=LogConfig.BACKUP_DAYS,
encoding='utf-8',
delay=True
)
file_handler.setFormatter(logging.Formatter(
'%(asctime)s [%(name)s] %(levelname)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
))
# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter('%(message)s'))
logger.addHandler(file_handler)
logger.addHandler(console_handler)
@staticmethod
def clean_old_logs():
if not os.path.exists(LogConfig.LOG_DIR):
return
now = time.time()
for fname in os.listdir(LogConfig.LOG_DIR):
if LogConfig.SINGLE_LOG and fname != 'trading_system.log':
continue
path = os.path.join(LogConfig.LOG_DIR, fname)
if os.stat(path).st_mtime < now - LogConfig.BACKUP_DAYS * 86400:
try:
os.remove(path)
except Exception as e:
print(f"删除旧日志失败 {fname}: {str(e)}")