1+ import aiohttp
2+ import asyncio
3+ import logging
4+ from solana .rpc .async_api import AsyncClient
5+ from solana .transaction import Transaction
6+ from solana .system_program import TransferParams , transfer
7+ from solana .keypair import Keypair
8+ from solana .publickey import PublicKey
9+ import csv
10+ import os
11+ import json
12+ from datetime import datetime
13+ import signal
14+ import sys
15+ from typing import Dict , List , Optional
16+
17+ class QuickTokenChecker :
18+ def __init__ (self , token_file ):
19+ self .jupiter_base_url = "https://quote-api.jup.ag/v6/quote"
20+ self .raydium_base_url = "https://api.raydium.io/v2/main/price"
21+ self .sol_address = 'So11111111111111111111111111111111111111112'
22+
23+ # Load tokens from JSON file
24+ with open (token_file , 'r' ) as f :
25+ data = json .load (f )
26+ if isinstance (data , dict ) and 'tokens' in data :
27+ self .token_addresses = {
28+ symbol : {
29+ 'address' : info ['address' ],
30+ 'decimal' : info ['decimal' ]
31+ }
32+ for symbol , info in data ['tokens' ].items ()
33+ }
34+ else :
35+ self .token_addresses = data
36+
37+ print (f"Loaded { len (self .token_addresses )} tokens to check" )
38+
39+ async def get_with_timeout (self , session , url , timeout = 5 , max_retries = 3 , ** kwargs ):
40+ """Make a GET request with timeout and retry logic"""
41+ for attempt in range (max_retries ):
42+ try :
43+ async with asyncio .timeout (timeout ):
44+ async with session .get (url , ** kwargs ) as response :
45+ if response .status == 429 : # Rate limit hit
46+ retry_after = int (response .headers .get ('Retry-After' , 5 ))
47+ await asyncio .sleep (retry_after )
48+ continue
49+
50+ status = response .status
51+ try :
52+ data = await response .json ()
53+ return status , data
54+ except Exception as e :
55+ text = await response .text ()
56+ return status , None
57+
58+ except asyncio .TimeoutError :
59+ if attempt < max_retries - 1 :
60+ await asyncio .sleep (2 ** attempt ) # Exponential backoff
61+ continue
62+ except Exception as e :
63+ if attempt < max_retries - 1 :
64+ await asyncio .sleep (2 ** attempt )
65+ continue
66+
67+ return None , None
68+
69+ async def get_pool_address (self , session , token_address ):
70+ """Get pool address from DexScreener"""
71+ try :
72+ url = f"https://api.dexscreener.com/latest/dex/tokens/{ token_address } "
73+ headers = {
74+ 'User-Agent' : 'Mozilla/5.0' ,
75+ 'Accept' : 'application/json'
76+ }
77+
78+ async with session .get (url , headers = headers ) as response :
79+ if response .status == 200 :
80+ data = await response .json ()
81+ pairs = data .get ('pairs' , [])
82+
83+ # Find Raydium pair
84+ for pair in pairs :
85+ if pair .get ('dexId' ) == 'raydium' :
86+ return {
87+ 'pair_address' : pair .get ('pairAddress' ),
88+ 'price' : float (pair .get ('priceUsd' , 0 ))
89+ }
90+ return None
91+ except Exception :
92+ return None
93+
94+ async def check_jupiter (self , session , symbol , address ):
95+ """
96+ Fetch token price using Jupiter API.
97+ """
98+
99+ # Fetch SOL/USDC price first
100+ sol_price_params = {
101+ 'inputMint' : self .sol_address ,
102+ 'outputMint' : 'EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v' , # USDC mint address
103+ 'amount' : '1000000000' , # Amount in lamports (1 SOL = 10^9 lamports)
104+ 'slippageBps' : 50 # Slippage tolerance in basis points
105+ }
106+
107+ sol_status , sol_data = await self .get_with_timeout (session , self .jupiter_base_url , params = sol_price_params )
108+
109+ if sol_status != 200 or not sol_data or 'outAmount' not in sol_data :
110+ print (f"Error fetching SOL/USDC price from Jupiter: { sol_data } " )
111+ return False , None
112+
113+ # Calculate SOL/USDC price
114+ sol_price_usdc = float (sol_data ['outAmount' ]) / 1e6 # Convert lamports to USDC
115+
116+ # Fetch token/SOL price
117+ token_price_params = {
118+ 'inputMint' : address ,
119+ 'outputMint' : self .sol_address ,
120+ 'amount' : '1000000000' , # Amount in smallest unit of the token
121+ 'slippageBps' : 50 # Slippage tolerance in basis points
122+ }
123+
124+ token_status , token_data = await self .get_with_timeout (session , self .jupiter_base_url , params = token_price_params )
125+
126+ if token_status == 200 and token_data and 'outAmount' in token_data :
127+ sol_value = float (token_data ['outAmount' ]) / float (token_price_params ['amount' ]) # Token/SOL exchange rate
128+ usdc_price = sol_value * sol_price_usdc # Convert SOL value to USDC
129+ # Get decimal from token info
130+ decimal = self .token_addresses [symbol ]['decimal' ]
131+ adjustment = 10 ** (9 - decimal ) if decimal < 9 else 1
132+ usdc_price = usdc_price / adjustment
133+ return True , {'price' : usdc_price }
134+
135+ print (f"Error fetching { symbol } price from Jupiter: { token_data } " )
136+ return False , None
137+
138+ async def check_raydium (self , session , symbol , address ):
139+ """Check token price on Raydium"""
140+ pool_data = await self .get_pool_address (session , address )
141+ if not pool_data :
142+ return False , None
143+
144+ return True , {
145+ 'price' : pool_data ['price' ]
146+ }
147+
148+ class ArbitrageMonitor (QuickTokenChecker ):
149+ def __init__ (self , token_file : str , config : Dict ):
150+ super ().__init__ (token_file )
151+ self .config = config
152+ self .running = False
153+ self .last_check_times : Dict [str , datetime ] = {}
154+ self .error_counts : Dict [str , int ] = {}
155+
156+ self .client = AsyncClient ("https://api.mainnet-beta.solana.com" )
157+ # self.trader = Keypair.from_secret_key(bytes.fromhex("5fLNC11ikbhoJeRp8uSU9nsgnchHjfPDwi1yzBMT93fHyzkNAm6MSZo5PkkyPcUdrMRvW59CGnZytWrQjatZV7yg"))
158+ self .trader = Keypair ();
159+
160+ # Register signal handlers
161+ signal .signal (signal .SIGINT , self .handle_shutdown )
162+ signal .signal (signal .SIGTERM , self .handle_shutdown )
163+
164+ def handle_shutdown (self , signum , frame ):
165+ """Handle graceful shutdown on signals"""
166+ print ("\n Shutdown signal received. Cleaning up..." )
167+ self .running = False
168+
169+ async def monitor_token (self , session : aiohttp .ClientSession , symbol : str , address : str ) -> Optional [dict ]:
170+ """Monitor a single token pair with error handling and rate limiting"""
171+ try :
172+ # Check if we need to wait due to rate limiting
173+ last_check = self .last_check_times .get (symbol )
174+ if last_check :
175+ time_since_last = (datetime .now () - last_check ).total_seconds ()
176+ if time_since_last < self .config ['min_check_interval' ]:
177+ await asyncio .sleep (self .config ['min_check_interval' ] - time_since_last )
178+
179+ # Update last check time
180+ self .last_check_times [symbol ] = datetime .now ()
181+
182+ # Check prices
183+ raydium_available , raydium_data = await self .check_raydium (session , symbol , address )
184+ if raydium_available :
185+ await asyncio .sleep (0.1 ) # Small delay between checks
186+ jupiter_available , jupiter_data = await self .check_jupiter (session , symbol , address )
187+ print (f"price-display: { raydium_data } ==={ jupiter_data } " )
188+ if jupiter_available and raydium_data and jupiter_data :
189+ ray_price = float (raydium_data ['price' ])
190+ jup_price = float (jupiter_data ['price' ])
191+
192+ diff_percent = abs (ray_price - jup_price ) / min (ray_price , jup_price ) * 100
193+
194+ if diff_percent > self .config ['min_price_difference' ]:
195+ # Determine buy/sell venues based on prices
196+ buy_price = min (ray_price , jup_price )
197+ sell_price = max (ray_price , jup_price )
198+ buy_on = 'Raydium' if buy_price == ray_price else 'Jupiter'
199+ sell_on = 'Jupiter' if sell_price == jup_price else 'Raydium'
200+
201+ opportunity = {
202+ 'symbol' : symbol ,
203+ 'address' : address ,
204+ 'buy_on' : buy_on ,
205+ 'sell_on' : sell_on ,
206+ 'buy_price' : buy_price ,
207+ 'sell_price' : sell_price ,
208+ 'difference_percent' : diff_percent ,
209+ 'timestamp' : datetime .now ().isoformat ()
210+ }
211+
212+ return opportunity
213+
214+ return None
215+
216+ except Exception as e :
217+ self .error_counts [symbol ] = self .error_counts .get (symbol , 0 ) + 1
218+ if self .error_counts [symbol ] > self .config ['max_errors' ]:
219+ print (f"Too many errors for { symbol } , considering removal from monitoring" )
220+ return None
221+
222+ async def execute_trade (self , client , trader , buy_dex , sell_dex , amount ):
223+ try :
224+ buy_tx = Transaction ()
225+ buy_tx .add (transfer (TransferParams (
226+ from_pubkey = trader .public_key ,
227+ to_pubkey = PublicKey ("BuyDEXPublicKey" ),
228+ lamports = amount
229+ )))
230+
231+ sell_tx = Transaction ()
232+ sell_tx .add (transfer (TransferParams (
233+ from_pubkey = trader .public_key ,
234+ to_pubkey = PublicKey ("SellDEXPublicKey" ),
235+ lamports = amount
236+ )))
237+
238+ buy_response = await self .client .send_transaction (buy_tx , self .trader )
239+ logging .info (f"Buy Transaction Response: { buy_response } " )
240+
241+ sell_response = await self .client .send_transaction (sell_tx , self .trader )
242+ logging .info (f"Sell Transaction Response: { sell_response } " )
243+
244+ except Exception as e :
245+ logging .error (f"Trade execution failed: { e } " )
246+
247+ async def run_monitoring_loop (self ):
248+ """Main monitoring loop with proper error handling and rate limiting"""
249+ self .running = True
250+ print (f"Starting monitoring loop at { datetime .now ()} " )
251+
252+ while self .running :
253+ try :
254+ timeout = aiohttp .ClientTimeout (total = 30 )
255+ async with aiohttp .ClientSession (timeout = timeout ) as session :
256+ while self .running :
257+ start_time = datetime .now ()
258+ opportunities = []
259+
260+ for symbol , address in self .token_addresses .items ():
261+ if symbol != 'SOL' :
262+ try :
263+ result = await self .monitor_token (session , symbol , address ['address' ])
264+ if isinstance (result , dict ):
265+ opportunities .append (result )
266+ except Exception as e :
267+ continue
268+
269+ if opportunities :
270+ self .save_opportunities (opportunities )
271+
272+ for opp in opportunities :
273+ print (f"\n 🔥 Opportunity found for { opp ['symbol' ]} :" )
274+ print (f"Buy on { opp ['buy_on' ]} at ${ opp ['buy_price' ]:.6f} " )
275+ print (f"Sell on { opp ['sell_on' ]} at ${ opp ['sell_price' ]:.6f} " )
276+ print (f"Difference: { opp ['difference_percent' ]:.2f} %" )
277+ await self .execute_trade (session , self .client , self .trader , opp ['buy_on' ], opp ['sell_on' ], 1000 )
278+ await asyncio .sleep (2 )
279+ elapsed = (datetime .now () - start_time ).total_seconds ()
280+ if elapsed < self .config ['check_interval' ]:
281+ await asyncio .sleep (self .config ['check_interval' ] - elapsed )
282+
283+ except Exception as e :
284+ print ("Restarting monitoring loop in 10 seconds..." )
285+ await asyncio .sleep (10 )
286+
287+ def save_opportunities (self , opportunities ):
288+ """Save opportunities to CSV file"""
289+ csv_filename = 'arbitrage_opportunities.csv'
290+ file_exists = os .path .exists (csv_filename )
291+
292+ with open (csv_filename , 'a' , newline = '' ) as f :
293+ headers = [
294+ 'timestamp' ,
295+ 'symbol' ,
296+ 'address' ,
297+ 'buy_on' ,
298+ 'sell_on' ,
299+ 'buy_price' ,
300+ 'sell_price' ,
301+ 'difference_percent'
302+ ]
303+
304+ writer = csv .DictWriter (f , fieldnames = headers )
305+
306+ if not file_exists :
307+ writer .writeheader ()
308+
309+ for opp in opportunities :
310+ row = {
311+ 'timestamp' : datetime .now ().strftime ("%Y-%m-%d %H:%M:%S" ),
312+ 'symbol' : opp ['symbol' ],
313+ 'address' : opp ['address' ],
314+ 'buy_on' : opp ['buy_on' ],
315+ 'sell_on' : opp ['sell_on' ],
316+ 'buy_price' : f"{ opp ['buy_price' ]:.8f} " ,
317+ 'sell_price' : f"{ opp ['sell_price' ]:.8f} " ,
318+ 'difference_percent' : f"{ opp ['difference_percent' ]:.2f} "
319+ }
320+ writer .writerow (row )
321+
322+ print (f"\n Logged { len (opportunities )} opportunities to { csv_filename } " )
323+
324+ async def main ():
325+ # Configuration
326+ config = {
327+ 'check_interval' : 60 , # Seconds between full check cycles
328+ 'min_check_interval' : 0 , # Minimum seconds between checks for same token
329+ 'min_price_difference' : 1.0 , # Minimum price difference percentage
330+ 'max_errors' : 5 , # Maximum errors before warning
331+ 'token_file' : 'sol_pairs.json'
332+ }
333+
334+ monitor = ArbitrageMonitor (config ['token_file' ], config )
335+
336+ try :
337+ await monitor .run_monitoring_loop ()
338+ except Exception as e :
339+ print (f"Fatal error: { str (e )} " )
340+ finally :
341+ print ("\n Shutting down..." )
342+
343+ if __name__ == "__main__" :
344+ print ("Starting the p05h SOL arbitrage monitor..." )
345+ asyncio .run (main ())
0 commit comments