Skip to content

Commit 9e94e10

Browse files
committed
added script for fetching ripple data
1 parent 2fc45d9 commit 9e94e10

File tree

1 file changed

+390
-0
lines changed

1 file changed

+390
-0
lines changed
Lines changed: 390 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,390 @@
1+
"""
2+
Ripple (XRP Ledger) snapshot extraction.
3+
This module fetches monthly account state snapshots from xrplcluster.com,
4+
stores them in SQLite, and exports CSVs. Balances are stored in drops (int).
5+
"""
6+
7+
import requests
8+
import sqlite3
9+
import csv
10+
import os
11+
import logging
12+
import time
13+
from datetime import datetime, timezone, timedelta
14+
15+
# ----------------------------------------------------------------------
16+
# Configuration
17+
# ----------------------------------------------------------------------
18+
RPC_URL = "https://xrplcluster.com"
19+
OUTPUT_DIR = "/mnt/output"
20+
os.makedirs(OUTPUT_DIR, exist_ok=True)
21+
DB_FILE = os.path.join(OUTPUT_DIR, "balances.db")
22+
23+
logging.basicConfig(
24+
filename=os.path.join(OUTPUT_DIR, "run.log"),
25+
level=logging.INFO,
26+
format="%(asctime)s %(message)s",
27+
)
28+
29+
session = requests.Session()
30+
# ----------------------------------------------------------------------
31+
# RPC utilities
32+
# ----------------------------------------------------------------------
33+
34+
35+
def rpc_call(payload, retries: int = 3, delay: float = 0.4) -> dict:
36+
"""Perform an RPC call with retries."""
37+
for attempt in range(retries):
38+
try:
39+
resp = session.post(RPC_URL, json=payload, timeout=60)
40+
resp.raise_for_status()
41+
result = resp.json()
42+
time.sleep(delay)
43+
return result
44+
except Exception as e:
45+
logging.warning(f"RPC call failed (attempt {attempt+1}): {e}")
46+
time.sleep(1)
47+
raise RuntimeError("RPC call failed after retries")
48+
49+
50+
def get_complete_ledger_range() -> tuple[int, int]:
51+
"""
52+
Returns (min_index, max_index) from server_info.complete_ledgers.
53+
"""
54+
result = rpc_call({"method": "server_info", "params": [{}]})
55+
complete = result["result"]["info"]["complete_ledgers"]
56+
# Parse ranges and take overall min/max
57+
min_idx, max_idx = None, None
58+
for part in complete.split(","):
59+
a, b = part.split("-")
60+
a, b = int(a), int(b)
61+
min_idx = a if min_idx is None else min(min_idx, a)
62+
max_idx = b if max_idx is None else max(max_idx, b)
63+
return min_idx, max_idx
64+
65+
66+
def get_ledger_close_time(ledger_index: int) -> datetime:
67+
"""Return the UTC close time of a given ledger index."""
68+
result = rpc_call(
69+
{
70+
"method": "ledger",
71+
"params": [
72+
{
73+
"ledger_index": ledger_index,
74+
"transactions": False,
75+
"accounts": False,
76+
"full": False,
77+
"expand": False,
78+
}
79+
],
80+
}
81+
)
82+
ledger = result["result"]["ledger"]
83+
84+
ct_human = ledger.get("close_time_human")
85+
if ct_human and ct_human.endswith(" UTC"):
86+
clean = ct_human.replace(" UTC", "")
87+
# If fractional seconds are present, strip them off
88+
if "." in clean:
89+
clean = clean.split(".")[0]
90+
return datetime.strptime(clean, "%Y-%b-%d %H:%M:%S").replace(
91+
tzinfo=timezone.utc
92+
)
93+
94+
# Fallback: use ripple epoch seconds
95+
ct_ripple = ledger.get("close_time")
96+
if isinstance(ct_ripple, int):
97+
ripple_epoch = datetime(2000, 1, 1, tzinfo=timezone.utc)
98+
return ripple_epoch + timedelta(seconds=ct_ripple)
99+
100+
raise ValueError("close_time not available in ledger response")
101+
102+
103+
# ----------------------------------------------------------------------
104+
# Database utilities
105+
# ----------------------------------------------------------------------
106+
107+
108+
def init_db(year, month):
109+
"""Initialize SQLite tables for a given snapshot month."""
110+
conn = sqlite3.connect(DB_FILE)
111+
cur = conn.cursor()
112+
table_name = f"accounts_{year}_{month:02d}"
113+
cur.execute(
114+
f"""
115+
CREATE TABLE IF NOT EXISTS {table_name} (
116+
account TEXT PRIMARY KEY,
117+
balance TEXT
118+
)
119+
"""
120+
)
121+
# Progress table
122+
cur.execute(
123+
"""
124+
CREATE TABLE IF NOT EXISTS progress (
125+
year INTEGER,
126+
month INTEGER,
127+
ledger_index INTEGER,
128+
last_marker TEXT,
129+
PRIMARY KEY (year, month)
130+
)
131+
"""
132+
)
133+
conn.commit()
134+
return conn, table_name
135+
136+
137+
def get_exact_snapshot_ledger_index(year: int, month: int) -> int:
138+
"""
139+
Find the first validated ledger whose close_time >= first day of next month (UTC).
140+
This is the canonical anchor for the prior month's end-of-month snapshot.
141+
"""
142+
# Target: first day of next month at 00:00:00 UTC
143+
next_month_dt = datetime(year, month, 1, tzinfo=timezone.utc) + timedelta(days=32)
144+
target = datetime(next_month_dt.year, next_month_dt.month, 1, tzinfo=timezone.utc)
145+
146+
lo, hi = get_complete_ledger_range()
147+
148+
ans = None
149+
while lo <= hi:
150+
mid = (lo + hi) // 2
151+
ct = get_ledger_close_time(mid)
152+
if ct >= target:
153+
ans = mid
154+
hi = mid - 1
155+
else:
156+
lo = mid + 1
157+
158+
if ans is None:
159+
raise RuntimeError(
160+
"No ledger found at or after the target time within complete range"
161+
)
162+
return ans
163+
164+
165+
def save_progress(conn, year, month, ledger_index, marker):
166+
cur = conn.cursor()
167+
cur.execute(
168+
"""
169+
INSERT INTO progress (year, month, ledger_index, last_marker)
170+
VALUES (?, ?, ?, ?)
171+
ON CONFLICT(year, month) DO UPDATE
172+
SET ledger_index=excluded.ledger_index,
173+
last_marker=excluded.last_marker
174+
""",
175+
(year, month, ledger_index, marker),
176+
)
177+
conn.commit()
178+
179+
180+
def load_progress(conn, year, month):
181+
cur = conn.cursor()
182+
cur.execute(
183+
"SELECT ledger_index, last_marker FROM progress WHERE year=? AND month=?",
184+
(year, month),
185+
)
186+
row = cur.fetchone()
187+
return row if row else (None, None)
188+
189+
190+
def upsert_balance(conn, table_name, account, balance):
191+
"""Insert or update account balance in the month’s table."""
192+
cur = conn.cursor()
193+
cur.execute(
194+
f"""
195+
INSERT INTO {table_name} (account, balance)
196+
VALUES (?, ?)
197+
ON CONFLICT(account) DO UPDATE SET balance=excluded.balance
198+
""",
199+
(account, balance),
200+
)
201+
202+
203+
# ----------------------------------------------------------------------
204+
# Snapshots
205+
# ----------------------------------------------------------------------
206+
207+
208+
def fetch_snapshot(ledger_index, conn, table_name, year, month):
209+
payload = {
210+
"method": "ledger_data",
211+
"params": [{"ledger_index": ledger_index, "type": "account", "limit": 1000}],
212+
}
213+
214+
# Resume from last marker if available
215+
last_ledger, last_marker = load_progress(conn, year, month)
216+
if last_marker:
217+
payload["params"][0]["marker"] = last_marker
218+
logging.info(f"Resuming from marker {last_marker}")
219+
220+
total_accounts = 0
221+
222+
while True:
223+
result = rpc_call(payload)
224+
state = result.get("result", {}).get("state", [])
225+
for entry in state:
226+
if entry["LedgerEntryType"] == "AccountRoot":
227+
account = entry["Account"]
228+
balance_drops = int(entry["Balance"])
229+
# balance_xrp = f"{balance_drops / 1_000_000:.6f}"
230+
# upsert_balance(conn, table_name, account, balance_xrp)
231+
upsert_balance(conn, table_name, account, balance_drops)
232+
total_accounts += 1
233+
if total_accounts % 1000 == 0:
234+
logging.info(
235+
f"{year} - {month} Total accounts saved: {total_accounts}"
236+
)
237+
238+
# Commit after each page
239+
conn.commit()
240+
241+
marker = result.get("result", {}).get("marker")
242+
save_progress(conn, year, month, ledger_index, marker)
243+
244+
if not marker:
245+
break
246+
payload["params"][0]["marker"] = marker
247+
248+
logging.info(f"Snapshot complete. Final total accounts saved: {total_accounts}")
249+
250+
251+
def export_month(year: int, month: int):
252+
253+
# Resolve exact anchor: first validated ledger of the next month (UTC)
254+
ledger_index = get_exact_snapshot_ledger_index(year, month)
255+
close_time = get_ledger_close_time(ledger_index)
256+
257+
logging.info(
258+
f"Fetching snapshot for {year}-{month:02d} at ledger {ledger_index} (close_time {close_time.isoformat()})"
259+
)
260+
261+
# Compute the first day of the next month
262+
snapshot_date = datetime(year, month, 1)
263+
next_month_date = snapshot_date.replace(day=28) + timedelta(days=4)
264+
next_month_date = next_month_date.replace(day=1)
265+
266+
filename = os.path.join(
267+
OUTPUT_DIR, f"ripple_{next_month_date.strftime('%Y-%m-%d')}_raw_data.csv"
268+
)
269+
conn, table_name = init_db(year, month)
270+
ensure_metadata_table(conn)
271+
record_snapshot_metadata(conn, year, month, ledger_index, close_time)
272+
273+
try:
274+
fetch_snapshot(ledger_index, conn, table_name, year, month)
275+
except Exception as e:
276+
logging.error(f"Error fetching ledger {ledger_index}: {e}")
277+
conn.close()
278+
return
279+
280+
# Export final balances to CSV
281+
cur = conn.cursor()
282+
cur.execute(f"SELECT account, balance FROM {table_name}")
283+
rows = cur.fetchall()
284+
with open(filename, "w", newline="") as f:
285+
writer = csv.writer(f)
286+
for row in rows:
287+
writer.writerow(row)
288+
289+
logging.info(f"Finished. Saved {len(rows)} accounts to {filename}")
290+
conn.close()
291+
292+
293+
def export_range(start_year: int, start_month: int, end_year: int, end_month: int):
294+
"""
295+
Export multiple months in sequence, from (start_year, start_month)
296+
through (end_year, end_month), inclusive.
297+
"""
298+
year, month = start_year, start_month
299+
while (year < end_year) or (year == end_year and month <= end_month):
300+
logging.info(f"Starting export for {year}-{month:02d}")
301+
export_month(year, month)
302+
# increment month
303+
if month == 12:
304+
year += 1
305+
month = 1
306+
else:
307+
month += 1
308+
309+
310+
def export_csv_from_sqlite(year: int, month: int):
311+
"""
312+
Export account balances for a given year and month from SQLite to CSV.
313+
Reads from table accounts_YYYY_MM and writes to /mnt/output/xrp_MM_YYYY.csv
314+
"""
315+
table_name = f"accounts_{year}_{month:02d}"
316+
csv_filename = f"xrp_{month:02d}_{year}.csv"
317+
csv_path = os.path.join(OUTPUT_DIR, csv_filename)
318+
319+
try:
320+
conn = sqlite3.connect(DB_FILE)
321+
cur = conn.cursor()
322+
323+
# Check if table exists
324+
cur.execute(
325+
"SELECT name FROM sqlite_master WHERE type='table' AND name=?",
326+
(table_name,),
327+
)
328+
if not cur.fetchone():
329+
logging.error(f"Table {table_name} does not exist in {DB_FILE}")
330+
conn.close()
331+
return
332+
333+
# Query all rows
334+
cur.execute(f"SELECT account, balance FROM {table_name}")
335+
rows = cur.fetchall()
336+
337+
# Write to CSV
338+
with open(csv_path, "w", newline="") as f:
339+
writer = csv.writer(f)
340+
writer.writerow(["account", "balance"])
341+
writer.writerows(rows)
342+
343+
logging.info(f"Successfully exported {len(rows)} rows to {csv_path}")
344+
conn.close()
345+
except Exception as e:
346+
logging.error(f"Failed to export CSV for {year}-{month:02d}: {e}")
347+
348+
349+
def ensure_metadata_table(conn):
350+
cur = conn.cursor()
351+
cur.execute(
352+
"""
353+
CREATE TABLE IF NOT EXISTS snapshot_metadata (
354+
year INTEGER,
355+
month INTEGER,
356+
ledger_index INTEGER,
357+
close_time TEXT,
358+
PRIMARY KEY (year, month)
359+
)
360+
"""
361+
)
362+
conn.commit()
363+
364+
365+
def record_snapshot_metadata(
366+
conn, year: int, month: int, ledger_index: int, close_time: datetime
367+
):
368+
cur = conn.cursor()
369+
cur.execute(
370+
"""
371+
INSERT INTO snapshot_metadata (year, month, ledger_index, close_time)
372+
VALUES (?, ?, ?, ?)
373+
ON CONFLICT(year, month) DO UPDATE
374+
SET ledger_index = excluded.ledger_index,
375+
close_time = excluded.close_time
376+
""",
377+
(year, month, ledger_index, close_time.isoformat()),
378+
)
379+
conn.commit()
380+
logging.info(
381+
f"Recorded metadata: {year}-{month:02d} → ledger {ledger_index}, close_time {close_time.isoformat()}"
382+
)
383+
384+
385+
# ----------------------------------------------------------------------
386+
# Entry point
387+
# ----------------------------------------------------------------------
388+
389+
if __name__ == "__main__":
390+
export_month(2025, 10) # Export October 2025 snapshot

0 commit comments

Comments
 (0)