22加密货币数据源
33使用 CCXT (Coinbase) 获取数据
44"""
5- from typing import Dict , List , Any , Optional
5+ from typing import Dict , List , Any , Optional , Tuple
66from datetime import datetime , timedelta
77import ccxt
88
@@ -21,6 +21,9 @@ class CryptoDataSource(BaseDataSource):
2121 # 时间周期映射
2222 TIMEFRAME_MAP = CCXTConfig .TIMEFRAME_MAP
2323
24+ # 常见的报价货币列表(按优先级排序)
25+ COMMON_QUOTES = ['USDT' , 'USD' , 'BTC' , 'ETH' , 'BUSD' , 'USDC' , 'BNB' , 'EUR' , 'GBP' ]
26+
2427 def __init__ (self ):
2528 config = {
2629 'timeout' : CCXTConfig .TIMEOUT ,
@@ -43,27 +46,189 @@ def __init__(self):
4346
4447 exchange_class = getattr (ccxt , exchange_id )
4548 self .exchange = exchange_class (config )
49+
50+ # 延迟加载 markets(首次使用时加载)
51+ self ._markets_loaded = False
52+ self ._markets_cache = None
53+
54+ def _ensure_markets_loaded (self ) -> bool :
55+ """确保 markets 已加载(用于符号验证)"""
56+ if self ._markets_loaded and self ._markets_cache is not None :
57+ return True
58+
59+ try :
60+ # 某些交易所需要显式加载 markets
61+ if hasattr (self .exchange , 'load_markets' ):
62+ self .exchange .load_markets (reload = False )
63+ self ._markets_cache = getattr (self .exchange , 'markets' , {})
64+ self ._markets_loaded = True
65+ return True
66+ except Exception as e :
67+ logger .debug (f"Failed to load markets for { self .exchange .id } : { e } " )
68+ return False
69+
70+ def _normalize_symbol (self , symbol : str ) -> Tuple [str , str ]:
71+ """
72+ 规范化符号格式,返回 (normalized_symbol, base_currency)
73+
74+ 处理各种输入格式:
75+ - BTC/USDT -> BTC/USDT
76+ - BTCUSDT -> BTC/USDT
77+ - BTC/USDT:USDT -> BTC/USDT
78+ - BTC -> BTC/USDT (默认)
79+ - PI, TRX -> PI/USDT, TRX/USDT
80+ """
81+ if not symbol :
82+ return '' , ''
83+
84+ sym = symbol .strip ()
85+
86+ # 移除 swap/futures 后缀
87+ if ':' in sym :
88+ sym = sym .split (':' , 1 )[0 ]
89+
90+ sym = sym .upper ()
91+
92+ # 如果已经有分隔符,直接解析
93+ if '/' in sym :
94+ parts = sym .split ('/' , 1 )
95+ base = parts [0 ].strip ()
96+ quote = parts [1 ].strip () if len (parts ) > 1 else ''
97+ if base and quote :
98+ return f"{ base } /{ quote } " , base
99+
100+ # 尝试从常见报价货币中识别
101+ for quote in self .COMMON_QUOTES :
102+ if sym .endswith (quote ) and len (sym ) > len (quote ):
103+ base = sym [:- len (quote )]
104+ if base :
105+ return f"{ base } /{ quote } " , base
106+
107+ # 如果无法识别,默认使用 USDT
108+ return f"{ sym } /USDT" , sym
109+
110+ def _find_valid_symbol (self , base : str , preferred_quote : str = 'USDT' ) -> Optional [str ]:
111+ """
112+ 在交易所的 markets 中查找有效的符号
113+
114+ Args:
115+ base: 基础货币(如 'PI', 'TRX')
116+ preferred_quote: 首选的报价货币
117+
118+ Returns:
119+ 找到的有效符号,如果找不到则返回 None
120+ """
121+ if not self ._ensure_markets_loaded ():
122+ return None
123+
124+ markets = self ._markets_cache or {}
125+ if not markets :
126+ return None
127+
128+ # 按优先级尝试不同的报价货币
129+ quotes_to_try = [preferred_quote ] + [q for q in self .COMMON_QUOTES if q != preferred_quote ]
130+
131+ for quote in quotes_to_try :
132+ candidate = f"{ base } /{ quote } "
133+ if candidate in markets :
134+ market = markets [candidate ]
135+ # 检查市场是否活跃
136+ if market .get ('active' , True ):
137+ return candidate
138+
139+ return None
140+
141+ def _normalize_symbol_for_exchange (self , symbol : str ) -> str :
142+ """
143+ 根据交易所特性规范化符号
144+
145+ 不同交易所的符号格式要求:
146+ - Binance: BTC/USDT (标准格式)
147+ - OKX: BTC/USDT (标准格式,但某些币种可能不支持)
148+ - Coinbase: BTC/USD (通常使用 USD 而不是 USDT)
149+ - Kraken: XBT/USD (BTC 映射为 XBT)
150+ - Bitfinex: tBTCUST (特殊格式)
151+ """
152+ normalized , base = self ._normalize_symbol (symbol )
153+
154+ if not normalized or not base :
155+ return symbol
156+
157+ exchange_id = getattr (self .exchange , 'id' , '' ).lower ()
158+
159+ # 特殊处理:某些交易所的符号映射
160+ if exchange_id == 'coinbase' :
161+ # Coinbase 通常使用 USD 而不是 USDT
162+ if normalized .endswith ('/USDT' ):
163+ usd_version = normalized .replace ('/USDT' , '/USD' )
164+ if self ._ensure_markets_loaded ():
165+ markets = self ._markets_cache or {}
166+ if usd_version in markets :
167+ return usd_version
168+
169+ # 尝试在交易所中查找有效符号
170+ if self ._ensure_markets_loaded ():
171+ valid_symbol = self ._find_valid_symbol (base , normalized .split ('/' )[1 ] if '/' in normalized else 'USDT' )
172+ if valid_symbol :
173+ return valid_symbol
174+
175+ return normalized
46176
47177 def get_ticker (self , symbol : str ) -> Dict [str , Any ]:
48178 """
49179 Get latest ticker for a crypto symbol via CCXT.
50180
51181 Accepts common formats:
52- - BTC/USDT
53- - BTCUSDT
54- - BTC/USDT:USDT (swap-style suffix, will be normalized)
182+ - BTC/USDT, BTCUSDT, BTC/USDT:USDT
183+ - PI, TRX (will be normalized and searched across exchanges)
184+ - 自动适配不同交易所的符号格式要求
55185 """
56- sym = (symbol or "" ).strip ()
57- if ":" in sym :
58- sym = sym .split (":" , 1 )[0 ]
59- sym = sym .upper ()
60- if "/" not in sym :
61- # Coinbase often uses USD, check if we need to adapt
62- if sym .endswith ("USDT" ) and len (sym ) > 4 :
63- sym = f"{ sym [:- 4 ]} /USDT"
64- elif sym .endswith ("USD" ) and len (sym ) > 3 :
65- sym = f"{ sym [:- 3 ]} /USD"
66- return self .exchange .fetch_ticker (sym )
186+ if not symbol or not symbol .strip ():
187+ return {'last' : 0 , 'symbol' : symbol }
188+
189+ # 规范化符号
190+ normalized = self ._normalize_symbol_for_exchange (symbol )
191+
192+ if not normalized :
193+ logger .warning (f"Failed to normalize symbol: { symbol } " )
194+ return {'last' : 0 , 'symbol' : symbol }
195+
196+ # 尝试获取 ticker
197+ try :
198+ ticker = self .exchange .fetch_ticker (normalized )
199+ if ticker and isinstance (ticker , dict ):
200+ return ticker
201+ except Exception as e :
202+ error_msg = str (e ).lower ()
203+ is_symbol_error = any (keyword in error_msg for keyword in [
204+ 'does not have market symbol' ,
205+ 'symbol not found' ,
206+ 'invalid symbol' ,
207+ 'market does not exist' ,
208+ 'trading pair not found'
209+ ])
210+
211+ if is_symbol_error :
212+ # 尝试查找替代符号
213+ base = normalized .split ('/' )[0 ] if '/' in normalized else normalized
214+ if self ._ensure_markets_loaded ():
215+ valid_symbol = self ._find_valid_symbol (base )
216+ if valid_symbol and valid_symbol != normalized :
217+ try :
218+ logger .debug (f"Trying alternative symbol: { valid_symbol } (original: { symbol } , first attempt: { normalized } )" )
219+ ticker = self .exchange .fetch_ticker (valid_symbol )
220+ if ticker and isinstance (ticker , dict ):
221+ return ticker
222+ except Exception as e2 :
223+ logger .debug (f"Alternative symbol { valid_symbol } also failed: { e2 } " )
224+
225+ # 如果所有尝试都失败,记录警告并返回默认值
226+ logger .warning (
227+ f"Symbol '{ symbol } ' (normalized: { normalized } ) not found on { self .exchange .id } . "
228+ f"Error: { str (e )[:100 ]} "
229+ )
230+
231+ return {'last' : 0 , 'symbol' : symbol }
67232
68233 def get_kline (
69234 self ,
@@ -78,11 +243,12 @@ def get_kline(
78243 try :
79244 ccxt_timeframe = self .TIMEFRAME_MAP .get (timeframe , '1d' )
80245
81- # 构建交易对符号
82- if not symbol .endswith ('USDT' ) and not symbol .endswith ('USD' ):
83- symbol_pair = f'{ symbol } /USDT'
84- else :
85- symbol_pair = symbol
246+ # 使用统一的符号规范化方法
247+ symbol_pair = self ._normalize_symbol_for_exchange (symbol )
248+
249+ if not symbol_pair :
250+ logger .warning (f"Failed to normalize symbol for K-line: { symbol } " )
251+ return []
86252
87253 # logger.info(f"获取加密货币K线: {symbol_pair}, 周期: {ccxt_timeframe}, 条数: {limit}")
88254
0 commit comments