Skip to content

Commit 3fb0a9a

Browse files
committed
feat(examples): listen and parse wallet txs
1 parent 9731b17 commit 3fb0a9a

File tree

1 file changed

+318
-0
lines changed

1 file changed

+318
-0
lines changed
Lines changed: 318 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,318 @@
1+
"""
2+
Listens for pump.fun bonding curve buy/sell transactions involving a specific wallet.
3+
Filters transactions to show only buy/sell operations on pump.fun bonding curves,
4+
excluding pump AMM (migrated) transactions..
5+
"""
6+
7+
import asyncio
8+
import base64
9+
import binascii
10+
import json
11+
import os
12+
import struct
13+
import sys
14+
from datetime import datetime
15+
16+
import base58
17+
import websockets
18+
from dotenv import load_dotenv
19+
20+
load_dotenv()
21+
22+
# Configuration
23+
WSS_ENDPOINT = os.environ.get("SOLANA_NODE_WSS_ENDPOINT")
24+
WALLET_TO_TRACK = "..." # Change this to your target wallet
25+
26+
# Pump.fun program constants
27+
PUMP_BONDING_CURVE_PROGRAM_ID = "6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P"
28+
PUMP_AMM_PROGRAM_ID = "pAMMBay6oceH9fJKBRHGP5D4bD4sWpmSwMn52FMfXEA"
29+
30+
# TradeEvent discriminator and parsing constants
31+
TRADE_EVENT_DISCRIMINATOR = bytes([189, 219, 127, 211, 78, 230, 97, 238])
32+
EVENT_DISCRIMINATOR_SIZE = 8
33+
34+
# Display settings
35+
MAX_LOGS_TO_SHOW = 5
36+
RECONNECT_DELAY = 5
37+
MIN_LOG_PARTS_FOR_PROGRAM = 2
38+
PING_INTERVAL = 20
39+
40+
if not WSS_ENDPOINT:
41+
print("Error: SOLANA_NODE_WSS_ENDPOINT environment variable not set")
42+
sys.exit(1)
43+
44+
45+
async def subscribe_to_wallet_logs(websocket):
46+
"""Subscribe to logs mentioning our target wallet."""
47+
subscription_message = json.dumps(
48+
{
49+
"jsonrpc": "2.0",
50+
"id": 1,
51+
"method": "logsSubscribe",
52+
"params": [{"mentions": [WALLET_TO_TRACK]}, {"commitment": "processed"}],
53+
}
54+
)
55+
56+
await websocket.send(subscription_message)
57+
print(f"Subscribed to logs mentioning wallet: {WALLET_TO_TRACK}")
58+
59+
# Wait for subscription confirmation
60+
response = await websocket.recv()
61+
response_data = json.loads(response)
62+
if "result" in response_data:
63+
print(f"Subscription confirmed with ID: {response_data['result']}")
64+
else:
65+
print(f"Unexpected subscription response: {response}")
66+
print("=" * 80)
67+
68+
69+
async def keep_connection_alive(websocket):
70+
"""Send ping messages to keep the WebSocket connection alive."""
71+
try:
72+
while True:
73+
await asyncio.sleep(PING_INTERVAL)
74+
try:
75+
pong_waiter = await websocket.ping()
76+
await asyncio.wait_for(pong_waiter, timeout=10)
77+
except TimeoutError:
78+
print("Ping timeout - server not responding")
79+
await websocket.close()
80+
return
81+
except asyncio.CancelledError:
82+
pass
83+
except (websockets.exceptions.WebSocketException, ConnectionError) as e:
84+
print(f"Ping error: {e}")
85+
86+
87+
def is_pump_bonding_curve_buysell(logs):
88+
"""Check if transaction is a pump.fun bonding curve buy/sell transaction."""
89+
# Must mention pump.fun bonding curve program
90+
if not any(PUMP_BONDING_CURVE_PROGRAM_ID in log for log in logs):
91+
return False
92+
93+
# Must NOT mention pump AMM program (exclude migrated tokens)
94+
if any(PUMP_AMM_PROGRAM_ID in log for log in logs):
95+
return False
96+
97+
# Must have TradeEvent data (more reliable than instruction logs)
98+
if parse_trade_event(logs) is None:
99+
return False
100+
101+
return True
102+
103+
104+
def parse_trade_event(logs):
105+
"""Parse TradeEvent data from transaction logs."""
106+
for log in logs:
107+
if "Program data:" in log:
108+
try:
109+
encoded_data = log.split("Program data: ")[1].strip()
110+
decoded_data = base64.b64decode(encoded_data)
111+
112+
if len(decoded_data) >= EVENT_DISCRIMINATOR_SIZE:
113+
discriminator = decoded_data[:EVENT_DISCRIMINATOR_SIZE]
114+
if discriminator == TRADE_EVENT_DISCRIMINATOR:
115+
return decode_trade_event(
116+
decoded_data[EVENT_DISCRIMINATOR_SIZE:]
117+
)
118+
except (ValueError, binascii.Error):
119+
continue
120+
return None
121+
122+
123+
def decode_trade_event(data):
124+
"""Decode TradeEvent structure from raw bytes."""
125+
if len(data) < 32 + 8 + 8 + 1 + 32: # minimum size check
126+
return None
127+
128+
offset = 0
129+
130+
# Parse fields according to TradeEvent structure
131+
mint = data[offset : offset + 32]
132+
offset += 32
133+
134+
sol_amount = struct.unpack("<Q", data[offset : offset + 8])[0]
135+
offset += 8
136+
137+
token_amount = struct.unpack("<Q", data[offset : offset + 8])[0]
138+
offset += 8
139+
140+
is_buy = bool(data[offset])
141+
offset += 1
142+
143+
user = data[offset : offset + 32]
144+
offset += 32
145+
146+
timestamp = struct.unpack("<q", data[offset : offset + 8])[0]
147+
offset += 8
148+
149+
virtual_sol_reserves = struct.unpack("<Q", data[offset : offset + 8])[0]
150+
offset += 8
151+
152+
virtual_token_reserves = struct.unpack("<Q", data[offset : offset + 8])[0]
153+
offset += 8
154+
155+
return {
156+
"mint": base58.b58encode(mint).decode(),
157+
"sol_amount": sol_amount,
158+
"token_amount": token_amount,
159+
"is_buy": is_buy,
160+
"user": base58.b58encode(user).decode(),
161+
"timestamp": timestamp,
162+
"virtual_sol_reserves": virtual_sol_reserves,
163+
"virtual_token_reserves": virtual_token_reserves,
164+
"price_per_token": (sol_amount * 1_000_000) / token_amount
165+
if token_amount > 0
166+
else 0,
167+
}
168+
169+
170+
def display_transaction_info(signature, logs):
171+
"""Display formatted transaction information."""
172+
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
173+
174+
print(f"[{timestamp}] Pump.fun Bonding Curve Transaction:")
175+
print(f" Signature: {signature}")
176+
print(f" Wallet: {WALLET_TO_TRACK}")
177+
178+
# Parse trade event data
179+
trade_data = parse_trade_event(logs)
180+
if trade_data:
181+
print(f" Type: {'BUY' if trade_data['is_buy'] else 'SELL'}")
182+
print(f" Token: {trade_data['mint']}")
183+
print(f" SOL Amount: {trade_data['sol_amount'] / 1_000_000_000:.6f} SOL")
184+
print(f" Token Amount: {trade_data['token_amount']:,}")
185+
print(
186+
f" Price per Token: {trade_data['price_per_token'] / 1_000_000_000:.9f} SOL"
187+
)
188+
print(f" Trader: {trade_data['user']}")
189+
190+
# Extract and display program info
191+
display_program_info(logs)
192+
193+
# Show pump.fun related logs
194+
display_pump_logs(logs)
195+
196+
print(" " + "-" * 76)
197+
198+
199+
def display_program_info(logs):
200+
"""Extract and display program information from logs."""
201+
programs_involved = set()
202+
instructions = []
203+
204+
for log in logs:
205+
if "Program " in log and " invoke" in log:
206+
parts = log.split()
207+
if len(parts) >= MIN_LOG_PARTS_FOR_PROGRAM:
208+
program_id = parts[1]
209+
programs_involved.add(program_id)
210+
elif "Instruction:" in log:
211+
instruction = log.split("Instruction: ")[-1]
212+
instructions.append(instruction)
213+
214+
if programs_involved:
215+
print(f" Programs involved: {', '.join(programs_involved)}")
216+
if instructions:
217+
print(f" Instructions: {', '.join(instructions)}")
218+
219+
220+
def display_pump_logs(logs):
221+
"""Display relevant pump.fun logs."""
222+
pump_logs = [
223+
log
224+
for log in logs
225+
if any(
226+
keyword in log
227+
for keyword in [
228+
"Program log: Instruction:",
229+
"Program data:",
230+
PUMP_BONDING_CURVE_PROGRAM_ID,
231+
]
232+
)
233+
]
234+
235+
if pump_logs:
236+
print(" Pump.fun logs:")
237+
for log in pump_logs[:MAX_LOGS_TO_SHOW]:
238+
print(f" {log}")
239+
if len(pump_logs) > MAX_LOGS_TO_SHOW:
240+
print(f" ... and {len(pump_logs) - MAX_LOGS_TO_SHOW} more logs")
241+
242+
243+
async def handle_transaction(log_data):
244+
"""Handle a transaction log notification."""
245+
signature = log_data.get("signature", "unknown")
246+
logs = log_data.get("logs", [])
247+
248+
# Check if this is a pump.fun bonding curve buy/sell transaction
249+
if not is_pump_bonding_curve_buysell(logs):
250+
return
251+
252+
display_transaction_info(signature, logs)
253+
254+
255+
async def process_websocket_message(websocket):
256+
"""Process incoming WebSocket messages."""
257+
try:
258+
response = await asyncio.wait_for(websocket.recv(), timeout=30)
259+
data = json.loads(response)
260+
261+
if "method" not in data or data["method"] != "logsNotification":
262+
return
263+
264+
log_data = data["params"]["result"]["value"]
265+
await handle_transaction(log_data)
266+
267+
except TimeoutError:
268+
print("No data received for 30 seconds")
269+
except websockets.exceptions.ConnectionClosed:
270+
print("WebSocket connection closed")
271+
raise
272+
except (json.JSONDecodeError, KeyError, ValueError) as e:
273+
print(f"Error processing message: {e}")
274+
275+
276+
async def listen_for_transactions():
277+
"""Main function to listen for wallet transactions."""
278+
print(f"Starting to monitor wallet: {WALLET_TO_TRACK}")
279+
print(f"Connecting to: {WSS_ENDPOINT}")
280+
print("Looking for pump.fun bonding curve buy/sell transactions only...")
281+
print("=" * 80)
282+
283+
while True:
284+
try:
285+
async with websockets.connect(WSS_ENDPOINT) as websocket:
286+
await subscribe_to_wallet_logs(websocket)
287+
ping_task = asyncio.create_task(keep_connection_alive(websocket))
288+
289+
try:
290+
while True:
291+
await process_websocket_message(websocket)
292+
except websockets.exceptions.ConnectionClosed:
293+
print("WebSocket connection closed. Reconnecting...")
294+
ping_task.cancel()
295+
296+
except (
297+
websockets.exceptions.WebSocketException,
298+
ConnectionError,
299+
OSError,
300+
) as e:
301+
print(f"Connection error: {e}")
302+
print(f"Reconnecting in {RECONNECT_DELAY} seconds...")
303+
await asyncio.sleep(RECONNECT_DELAY)
304+
305+
306+
def main():
307+
"""Main function to run the wallet transaction listener."""
308+
try:
309+
asyncio.run(listen_for_transactions())
310+
except KeyboardInterrupt:
311+
print("\nStopping wallet transaction listener...")
312+
except (ValueError, RuntimeError) as e:
313+
print(f"Error: {e}")
314+
sys.exit(1)
315+
316+
317+
if __name__ == "__main__":
318+
main()

0 commit comments

Comments
 (0)