Skip to content

Commit f8b82a3

Browse files
committed
Add v1 of wallet explorer
1 parent 20d4cce commit f8b82a3

10 files changed

+1165
-408
lines changed

backend/api/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,5 @@
1616
user_retention_explorer_api,
1717
user_retention_summary_api,
1818
vaults_api,
19+
wallet_activity_api,
1920
)

backend/api/high_leverage_api.py

Lines changed: 242 additions & 109 deletions
Large diffs are not rendered by default.

backend/api/user_retention_explorer_api.py

Lines changed: 173 additions & 66 deletions
Large diffs are not rendered by default.

backend/api/wallet_activity_api.py

Lines changed: 227 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,227 @@
1+
import logging
2+
import os
3+
import warnings
4+
from datetime import datetime, timedelta
5+
from typing import Any, Dict, List, Set, Tuple
6+
7+
import pandas as pd
8+
from dateutil import parser, tz
9+
from fastapi import APIRouter, HTTPException, Query
10+
from pyathena import connect
11+
12+
warnings.filterwarnings("ignore", category=UserWarning)
13+
14+
logging.basicConfig(level=logging.INFO)
15+
logger = logging.getLogger(__name__)
16+
router = APIRouter()
17+
18+
DATABASE = os.environ.get("ATHENA_DATABASE", "mainnet-beta-archive")
19+
REGION = os.environ.get("AWS_REGION", "eu-west-1")
20+
S3_OUTPUT = os.environ.get(
21+
"ATHENA_S3_OUTPUT", "s3://mainnet-beta-data-ingestion-bucket/athena/"
22+
)
23+
24+
QUOTE_ASSET_SCALE_FACTOR = 1e6
25+
UTC = tz.tzutc()
26+
27+
28+
def dt_from_ms(ms: int) -> datetime:
29+
return datetime.fromtimestamp(ms / 1_000, tz=UTC)
30+
31+
32+
def partition_tuples(start: datetime, days: int) -> Set[Tuple[str, str, str]]:
33+
return {
34+
(d.strftime("%Y"), d.strftime("%m"), d.strftime("%d"))
35+
for d in (start + timedelta(n) for n in range(days))
36+
}
37+
38+
39+
def partition_pred(parts: Set[Tuple[str, str, str]]) -> str:
40+
lines = [f"(year='{y}' AND month='{m}' AND day='{d}')" for y, m, d in sorted(parts)]
41+
return " OR ".join(lines)
42+
43+
44+
def sql_get_all_traders_since_date(start_date: datetime) -> str:
45+
"""Get all unique traders who have traded since the start date."""
46+
now = datetime.now(tz=UTC)
47+
parts = partition_pred(partition_tuples(start_date, (now - start_date).days + 1))
48+
start_ts = int(start_date.timestamp())
49+
50+
full_query = f"""
51+
SELECT DISTINCT "user"
52+
FROM eventtype_orderrecord
53+
WHERE ({parts}) AND CAST(ts AS BIGINT) >= {start_ts}
54+
"""
55+
56+
print(f"Sql get all traders since date:\n {full_query}")
57+
return full_query
58+
59+
60+
def sql_get_user_first_trade_date(users: List[str]) -> str:
61+
"""Get the first trade date for each user."""
62+
if not users:
63+
return ""
64+
user_list = "', '".join(users)
65+
66+
full_query = f"""
67+
SELECT
68+
"user",
69+
MIN(CAST(ts AS BIGINT)) as first_trade_ts
70+
FROM eventtype_orderrecord
71+
WHERE "user" IN ('{user_list}')
72+
GROUP BY "user"
73+
"""
74+
75+
print(f"Sql get user first trade date:\n {full_query}")
76+
return full_query
77+
78+
79+
def sql_get_user_last_trade_before_date(users: List[str], before_date: datetime) -> str:
80+
"""Get the last trade date before a given date for each user."""
81+
if not users:
82+
return ""
83+
84+
user_list = "', '".join(users)
85+
before_ts = int(before_date.timestamp())
86+
87+
start_lookup = before_date - timedelta(days=15)
88+
days_lookup = (before_date - start_lookup).days
89+
parts = partition_pred(partition_tuples(start_lookup, days_lookup))
90+
91+
full_query = f"""
92+
SELECT
93+
"user",
94+
MAX(CAST(ts AS BIGINT)) as last_trade_ts
95+
FROM eventtype_orderrecord
96+
WHERE ({parts}) AND CAST(ts AS BIGINT) < {before_ts}
97+
AND "user" IN ('{user_list}')
98+
GROUP BY "user"
99+
"""
100+
101+
print(f"Sql get user last trade before date:\n {full_query}")
102+
return full_query
103+
104+
105+
def sql_get_user_volume_since_date(users: List[str], start_date: datetime) -> str:
106+
"""Get trading volume for users since a given date."""
107+
if not users:
108+
return ""
109+
110+
user_list = "', '".join(users)
111+
start_ts = int(start_date.timestamp())
112+
now = datetime.now(tz=UTC)
113+
days_since = (now - start_date).days + 1
114+
parts = partition_pred(partition_tuples(start_date, days_since))
115+
116+
full_query = f"""
117+
WITH user_trades AS (
118+
SELECT
119+
taker AS user,
120+
CAST(quoteassetamountfilled AS DOUBLE) / {QUOTE_ASSET_SCALE_FACTOR} AS quote_volume
121+
FROM eventtype_traderecord
122+
WHERE ({parts}) AND CAST(ts AS BIGINT) >= {start_ts}
123+
AND taker IN ('{user_list}')
124+
UNION ALL
125+
SELECT
126+
maker AS user,
127+
CAST(quoteassetamountfilled AS DOUBLE) / {QUOTE_ASSET_SCALE_FACTOR} AS quote_volume
128+
FROM eventtype_traderecord
129+
WHERE ({parts}) AND CAST(ts AS BIGINT) >= {start_ts}
130+
AND maker IN ('{user_list}')
131+
)
132+
SELECT
133+
user,
134+
SUM(quote_volume) AS total_volume
135+
FROM user_trades
136+
GROUP BY user
137+
"""
138+
139+
print(f"Sql get user volume since date:\n {full_query}")
140+
return full_query
141+
142+
143+
async def calculate_wallet_activity(since_date_str: str) -> Dict[str, Any]:
144+
conn = None
145+
try:
146+
since_date = parser.parse(since_date_str).replace(tzinfo=UTC)
147+
logger.info(
148+
f"Connecting to Athena. S3 staging: {S3_OUTPUT}, Region: {REGION}, DB: {DATABASE}"
149+
)
150+
conn = connect(
151+
s3_staging_dir=S3_OUTPUT, region_name=REGION, schema_name=DATABASE
152+
)
153+
logger.info("Successfully connected to Athena.")
154+
logger.info(f"Getting all traders since {since_date_str}...")
155+
q_all_traders = sql_get_all_traders_since_date(since_date)
156+
all_traders_df = pd.read_sql(q_all_traders, conn)
157+
all_traders = all_traders_df["user"].tolist()
158+
159+
if not all_traders:
160+
return {
161+
"analysis_date": since_date_str,
162+
"new_wallets_count": 0,
163+
"new_wallets_volume": 0.0,
164+
"reactivated_wallets_count": 0,
165+
"reactivated_wallets_volume": 0.0,
166+
"active_wallets_count": 0,
167+
"active_wallets_volume": 0.0,
168+
"total_wallets_count": 0,
169+
"total_volume": 0.0,
170+
"wallet_data": [],
171+
}
172+
173+
logger.info("Getting first trade dates for all traders...")
174+
q_first_trades = sql_get_user_first_trade_date(all_traders)
175+
first_trades_df = pd.read_sql(q_first_trades, conn)
176+
logger.info("Getting last trade dates before analysis date...")
177+
q_last_trades = sql_get_user_last_trade_before_date(all_traders, since_date)
178+
last_trades_df = pd.read_sql(q_last_trades, conn)
179+
logger.info("Calculating volume since analysis date...")
180+
q_volume = sql_get_user_volume_since_date(all_traders, since_date)
181+
volume_df = pd.read_sql(q_volume, conn)
182+
183+
result = {
184+
"analysis_date": since_date_str,
185+
"volume_df": volume_df.to_dict(orient="records"),
186+
"first_trades_df": first_trades_df.to_dict(orient="records"),
187+
"last_trades_df": last_trades_df.to_dict(orient="records"),
188+
}
189+
190+
logger.info(f"Successfully calculated wallet activity for {since_date_str}.")
191+
return result
192+
193+
except Exception as e:
194+
logger.error(f"Error in calculate_wallet_activity: {e}", exc_info=True)
195+
raise HTTPException(
196+
status_code=500, detail=f"Failed to process wallet activity data: {str(e)}"
197+
)
198+
finally:
199+
if conn:
200+
conn.close()
201+
logger.info("Athena connection closed.")
202+
203+
204+
@router.get("/calculate", response_model=Dict[str, Any])
205+
async def get_wallet_activity(
206+
since_date: str = Query(
207+
..., description="The date to analyze wallet activity from (YYYY-MM-DD)."
208+
),
209+
):
210+
"""
211+
Analyzes wallet activity since a given date, categorizing wallets into:
212+
- New Wallets: Wallets that connected for the first time after the given date
213+
- Reactivated Wallets: Wallets that had no trading activity for more than 15 days prior to the given date, but traded again after
214+
- Active Wallets: Wallets that did trade within 15 days before the given date, and continued trading after
215+
"""
216+
try:
217+
logger.info(f"Received request for /calculate: since_date='{since_date}'")
218+
result_data = await calculate_wallet_activity(since_date)
219+
return result_data
220+
except HTTPException as http_exc:
221+
raise http_exc
222+
except Exception as e:
223+
logger.error(f"Unhandled error in /calculate endpoint: {e}", exc_info=True)
224+
raise HTTPException(
225+
status_code=500,
226+
detail="An internal server error occurred during calculation.",
227+
)

backend/app.py

Lines changed: 39 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
backend_health,
1414
deposits_api,
1515
health,
16+
high_leverage_api,
1617
liquidation_curves_api,
1718
market_recommender_api,
1819
metadata,
@@ -22,10 +23,10 @@
2223
price_shock,
2324
snapshot,
2425
ucache,
25-
vaults_api,
26-
high_leverage_api,
27-
user_retention_summary_api,
2826
user_retention_explorer_api,
27+
user_retention_summary_api,
28+
vaults_api,
29+
wallet_activity_api,
2930
)
3031
from backend.middleware.cache_middleware import CacheMiddleware
3132
from backend.middleware.readiness import ReadinessMiddleware
@@ -78,9 +79,15 @@ async def lifespan(app: FastAPI):
7879
app.add_middleware(CacheMiddleware, state=state, cache_dir="cache")
7980

8081
app.include_router(health.router, prefix="/api/health", tags=["health"])
81-
app.include_router(backend_health.router, prefix="/api/backend-health", tags=["backend-health"])
82+
app.include_router(
83+
backend_health.router, prefix="/api/backend-health", tags=["backend-health"]
84+
)
8285
app.include_router(metadata.router, prefix="/api/metadata", tags=["metadata"])
83-
app.include_router(liquidation_curves_api.router, prefix="/api/liquidation-curves", tags=["liquidation-curves"])
86+
app.include_router(
87+
liquidation_curves_api.router,
88+
prefix="/api/liquidation-curves",
89+
tags=["liquidation-curves"],
90+
)
8491
app.include_router(price_shock.router, prefix="/api/price-shock", tags=["price-shock"])
8592
app.include_router(
8693
asset_liability.router, prefix="/api/asset-liability", tags=["asset-liability"]
@@ -91,11 +98,33 @@ async def lifespan(app: FastAPI):
9198
app.include_router(pnl_api.router, prefix="/api/pnl", tags=["pnl"])
9299
app.include_router(vaults_api.router, prefix="/api/vaults", tags=["vaults"])
93100
app.include_router(positions.router, prefix="/api/positions", tags=["positions"])
94-
app.include_router(market_recommender_api.router, prefix="/api/market-recommender", tags=["market-recommender"])
95-
app.include_router(open_interest_api.router, prefix="/api/open-interest", tags=["open-interest"])
96-
app.include_router(high_leverage_api.router, prefix="/api/high-leverage", tags=["high-leverage"])
97-
app.include_router(user_retention_summary_api.router, prefix="/api/user-retention-summary", tags=["user-retention-summary"])
98-
app.include_router(user_retention_explorer_api.router, prefix="/api/user-retention-explorer", tags=["user-retention-explorer"])
101+
app.include_router(
102+
market_recommender_api.router,
103+
prefix="/api/market-recommender",
104+
tags=["market-recommender"],
105+
)
106+
app.include_router(
107+
open_interest_api.router, prefix="/api/open-interest", tags=["open-interest"]
108+
)
109+
app.include_router(
110+
high_leverage_api.router, prefix="/api/high-leverage", tags=["high-leverage"]
111+
)
112+
app.include_router(
113+
user_retention_summary_api.router,
114+
prefix="/api/user-retention-summary",
115+
tags=["user-retention-summary"],
116+
)
117+
app.include_router(
118+
user_retention_explorer_api.router,
119+
prefix="/api/user-retention-explorer",
120+
tags=["user-retention-explorer"],
121+
)
122+
app.include_router(
123+
wallet_activity_api.router,
124+
prefix="/api/wallet-activity",
125+
tags=["wallet-activity"],
126+
)
127+
99128

100129
# NOTE: All other routes should be in /api/* within the /api folder. Routes outside of /api are not exposed in k8s
101130
@app.get("/")

requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ uvicorn==0.34.0
1212
requests==2.32.4
1313
plotly==6.0.0
1414
anchorpy==0.21.0
15-
driftpy==0.8.59
15+
driftpy==0.8.64
1616
ccxt==4.2.17
1717
rich>=10.14.0
1818
aiofiles==24.1.0

src/main.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,19 +12,20 @@
1212
from page.backend import backend_page
1313
from page.deposits_page import deposits_page
1414
from page.health import health_page
15+
from page.high_leverage_page import high_leverage_page
1516
from page.liquidation_curves_page import liquidation_curves_page
1617
from page.market_inspector_page import market_inspector_page
18+
from page.market_recommender_page import market_recommender_page
19+
from page.open_interest_page import open_interest_page
1720
from page.orderbook import orderbook_page
1821
from page.pnl_page import pnl_page
1922
from page.price_shock import price_shock_cached_page
2023
from page.swap import show as swap_page
24+
from page.user_retention_explorer_page import user_retention_explorer_page
25+
from page.user_retention_summary_page import user_retention_summary_page
2126
from page.vaults_page import vaults_page
27+
from page.wallet_activity_page import wallet_activity_page
2228
from page.welcome import welcome_page
23-
from page.market_recommender_page import market_recommender_page
24-
from page.open_interest_page import open_interest_page
25-
from page.high_leverage_page import high_leverage_page
26-
from page.user_retention_summary_page import user_retention_summary_page
27-
from page.user_retention_explorer_page import user_retention_explorer_page
2829

2930
load_dotenv()
3031

@@ -163,6 +164,12 @@ def apply_custom_css(css):
163164
title="User Retention Explorer",
164165
icon="🔍",
165166
),
167+
st.Page(
168+
needs_backend(wallet_activity_page),
169+
url_path="wallet-activity",
170+
title="Wallet Activity",
171+
icon="👛",
172+
),
166173
]
167174

168175
pg = st.navigation(

0 commit comments

Comments
 (0)