Skip to content

Commit 3f582dd

Browse files
committed
Re-add sync endpoints for accounts, lists and direct donations
1 parent e3c1086 commit 3f582dd

File tree

3 files changed

+639
-0
lines changed

3 files changed

+639
-0
lines changed

api/sync.py

Lines changed: 399 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,399 @@
1+
"""
2+
Sync endpoints - fetch data from blockchain RPC and store in database.
3+
4+
Called by frontend after user creates/updates a list/registration or account profile.
5+
Replaces the 24/7 indexer - now we fetch on-demand when user acts.
6+
7+
Endpoints:
8+
Lists:
9+
POST /api/v1/lists/{list_id}/sync - Sync single list
10+
POST /api/v1/lists/{list_id}/registrations/sync - Sync all registrations
11+
POST /api/v1/lists/{list_id}/registrations/{registrant_id}/sync - Sync single registration
12+
13+
Accounts:
14+
POST /api/v1/accounts/{account_id}/sync - Sync account profile and recalculate stats
15+
"""
16+
import base64
17+
import json
18+
import logging
19+
from datetime import datetime
20+
21+
import requests
22+
from django.conf import settings
23+
from drf_spectacular.utils import OpenApiResponse, extend_schema
24+
from rest_framework.response import Response
25+
from rest_framework.views import APIView
26+
27+
from accounts.models import Account
28+
from donations.models import Donation
29+
from lists.models import List, ListRegistration
30+
31+
logger = logging.getLogger(__name__)
32+
33+
LISTS_CONTRACT = f"lists.{settings.POTLOCK_TLA}"
34+
35+
36+
def fetch_from_rpc(method_name: str, args: dict = None, contract_id: str = None, timeout: int = 60):
37+
"""
38+
Fetch data from NEAR RPC with multiple fallbacks and retry logic.
39+
40+
Order of attempts:
41+
1. Web4 RPC (most efficient)
42+
2. FastNear free RPC
43+
3. Official NEAR RPC (most reliable but rate-limited)
44+
"""
45+
from requests.adapters import HTTPAdapter
46+
from urllib3.util.retry import Retry
47+
48+
account_id = contract_id or LISTS_CONTRACT
49+
50+
# Create session with retry logic
51+
session = requests.Session()
52+
retries = Retry(total=2, backoff_factor=0.5, status_forcelist=[502, 503, 504])
53+
session.mount("https://", HTTPAdapter(max_retries=retries))
54+
55+
# Try Web4 RPC first (skip for heavy methods that often fail)
56+
heavy_methods = ["get_registrations_for_list"]
57+
if method_name not in heavy_methods:
58+
web4_url = f"{settings.FASTNEAR_RPC_URL}/account/{account_id}/view/{method_name}"
59+
try:
60+
response = session.post(web4_url, json=args or {}, timeout=timeout)
61+
if response.status_code == 200:
62+
return response.json()
63+
logger.warning(f"web4 RPC returned status {response.status_code}")
64+
except Exception as e:
65+
logger.warning(f"web4 RPC failed: {e}")
66+
67+
# RPC endpoints to try in order
68+
rpc_endpoints = [
69+
"https://free.rpc.fastnear.com" if settings.ENVIRONMENT != "testnet" else "https://test.rpc.fastnear.com",
70+
"https://rpc.mainnet.near.org" if settings.ENVIRONMENT != "testnet" else "https://rpc.testnet.near.org",
71+
]
72+
73+
args_base64 = base64.b64encode(json.dumps(args or {}).encode()).decode()
74+
payload = {
75+
"jsonrpc": "2.0",
76+
"id": "dontcare",
77+
"method": "query",
78+
"params": {
79+
"request_type": "call_function",
80+
"account_id": account_id,
81+
"method_name": method_name,
82+
"args_base64": args_base64,
83+
"finality": "optimistic"
84+
}
85+
}
86+
87+
last_error = None
88+
for rpc_url in rpc_endpoints:
89+
try:
90+
logger.info(f"Trying RPC: {rpc_url} for {method_name}")
91+
response = session.post(rpc_url, json=payload, timeout=timeout)
92+
result = response.json()
93+
94+
if "error" in result:
95+
logger.warning(f"RPC error from {rpc_url}: {result['error']}")
96+
last_error = result["error"]
97+
continue
98+
99+
if "error" in result.get("result", {}):
100+
logger.warning(f"Contract error from {rpc_url}: {result['result']['error']}")
101+
last_error = result["result"]["error"]
102+
continue
103+
104+
# Decode result bytes to JSON
105+
if "result" not in result.get("result", {}):
106+
return None
107+
result_bytes = bytes(result["result"]["result"])
108+
return json.loads(result_bytes.decode())
109+
110+
except requests.exceptions.Timeout:
111+
logger.warning(f"RPC {rpc_url} timed out after {timeout}s")
112+
last_error = f"Timeout after {timeout}s"
113+
except Exception as e:
114+
logger.warning(f"RPC {rpc_url} failed: {e}")
115+
last_error = str(e)
116+
117+
raise Exception(f"All RPC endpoints failed. Last error: {last_error}")
118+
119+
120+
class ListSyncAPI(APIView):
121+
"""
122+
Sync a list from blockchain to database.
123+
124+
Called by frontend after user creates a list.
125+
Fetches current state from RPC, creates/updates in DB.
126+
"""
127+
128+
@extend_schema(
129+
summary="Sync list from blockchain",
130+
responses={
131+
200: OpenApiResponse(description="List synced"),
132+
404: OpenApiResponse(description="List not found on chain"),
133+
502: OpenApiResponse(description="RPC failed"),
134+
}
135+
)
136+
def post(self, request, list_id: int):
137+
try:
138+
# Fetch from RPC
139+
data = fetch_from_rpc("get_list", {"list_id": int(list_id)})
140+
141+
if not data:
142+
return Response({"error": "List not found on chain"}, status=404)
143+
144+
# Check if already exists
145+
existing_list = List.objects.filter(on_chain_id=int(list_id)).first()
146+
147+
if existing_list:
148+
# Update existing list
149+
existing_list.name = data["name"]
150+
existing_list.description = data.get("description", "")
151+
existing_list.cover_image_url = data.get("cover_image_url")
152+
existing_list.admin_only_registrations = data.get("admin_only_registrations", False)
153+
existing_list.default_registration_status = data.get("default_registration_status", "Pending")
154+
existing_list.updated_at = datetime.fromtimestamp(data["updated_at"] / 1000)
155+
existing_list.save()
156+
157+
# Update admins
158+
existing_list.admins.clear()
159+
for admin_id in data.get("admins", []):
160+
admin, _ = Account.objects.get_or_create(id=admin_id)
161+
existing_list.admins.add(admin)
162+
163+
return Response({
164+
"success": True,
165+
"message": "List updated",
166+
"on_chain_id": list_id
167+
})
168+
169+
# Create list (on_chain_id is the blockchain ID, id is auto-generated)
170+
list_obj = List.objects.create(
171+
on_chain_id=data["id"],
172+
owner_id=data["owner"],
173+
name=data["name"],
174+
description=data.get("description", ""),
175+
cover_image_url=data.get("cover_image_url"),
176+
admin_only_registrations=data.get("admin_only_registrations", False),
177+
default_registration_status=data.get("default_registration_status", "Pending"),
178+
created_at=datetime.fromtimestamp(data["created_at"] / 1000),
179+
updated_at=datetime.fromtimestamp(data["updated_at"] / 1000),
180+
)
181+
182+
# Create owner account
183+
Account.objects.get_or_create(id=data["owner"])
184+
185+
# Add admins
186+
for admin_id in data.get("admins", []):
187+
admin, _ = Account.objects.get_or_create(id=admin_id)
188+
list_obj.admins.add(admin)
189+
190+
return Response({
191+
"success": True,
192+
"message": "List created",
193+
"on_chain_id": list_obj.on_chain_id
194+
})
195+
196+
except Exception as e:
197+
logger.error(f"Error syncing list {list_id}: {e}")
198+
return Response({"error": str(e)}, status=502)
199+
200+
201+
class ListRegistrationsSyncAPI(APIView):
202+
"""
203+
Sync all registrations for a list.
204+
205+
Called after user registers to a list.
206+
"""
207+
208+
@extend_schema(
209+
summary="Sync all registrations for a list",
210+
responses={
211+
200: OpenApiResponse(description="Registrations synced"),
212+
404: OpenApiResponse(description="List not found"),
213+
502: OpenApiResponse(description="RPC failed"),
214+
}
215+
)
216+
def post(self, request, list_id: int):
217+
try:
218+
# Ensure list exists
219+
try:
220+
list_obj = List.objects.get(on_chain_id=int(list_id))
221+
except List.DoesNotExist:
222+
# Sync list first
223+
list_sync = ListSyncAPI()
224+
resp = list_sync.post(request, list_id)
225+
if resp.status_code != 200:
226+
return Response({"error": "List not found"}, status=404)
227+
list_obj = List.objects.get(on_chain_id=int(list_id))
228+
229+
# Fetch registrations from RPC (use longer timeout for large lists)
230+
registrations = fetch_from_rpc("get_registrations_for_list", {"list_id": int(list_id)}, timeout=120)
231+
232+
if not registrations:
233+
registrations = []
234+
235+
synced = 0
236+
for reg in registrations:
237+
# Create accounts
238+
Account.objects.get_or_create(id=reg["registrant_id"])
239+
Account.objects.get_or_create(id=reg.get("registered_by", reg["registrant_id"]))
240+
241+
# Create/update registration (id is AutoField, use list+registrant as unique key)
242+
ListRegistration.objects.update_or_create(
243+
list=list_obj,
244+
registrant_id=reg["registrant_id"],
245+
defaults={
246+
"registered_by_id": reg.get("registered_by", reg["registrant_id"]),
247+
"status": reg.get("status", "Pending"),
248+
"submitted_at": datetime.fromtimestamp(reg["submitted_ms"] / 1000),
249+
"updated_at": datetime.fromtimestamp(reg["updated_ms"] / 1000),
250+
"admin_notes": reg.get("admin_notes"),
251+
"registrant_notes": reg.get("registrant_notes"),
252+
}
253+
)
254+
synced += 1
255+
256+
return Response({
257+
"success": True,
258+
"message": f"Synced {synced} registrations",
259+
"synced_count": synced
260+
})
261+
262+
except Exception as e:
263+
logger.error(f"Error syncing registrations for list {list_id}: {e}")
264+
return Response({"error": str(e)}, status=502)
265+
266+
267+
class SingleRegistrationSyncAPI(APIView):
268+
"""
269+
Sync a single registration.
270+
271+
More efficient than syncing all when you know the registrant.
272+
"""
273+
274+
@extend_schema(
275+
summary="Sync single registration",
276+
responses={
277+
200: OpenApiResponse(description="Registration synced"),
278+
404: OpenApiResponse(description="Not found"),
279+
502: OpenApiResponse(description="RPC failed"),
280+
}
281+
)
282+
def post(self, request, list_id: int, registrant_id: str):
283+
try:
284+
# Ensure list exists
285+
try:
286+
list_obj = List.objects.get(on_chain_id=int(list_id))
287+
except List.DoesNotExist:
288+
list_sync = ListSyncAPI()
289+
resp = list_sync.post(request, list_id)
290+
if resp.status_code != 200:
291+
return Response({"error": "List not found"}, status=404)
292+
list_obj = List.objects.get(on_chain_id=int(list_id))
293+
294+
# Fetch all registrations and filter (contract doesn't have single-registration lookup by registrant_id)
295+
registrations = fetch_from_rpc("get_registrations_for_list", {"list_id": int(list_id)}, timeout=120)
296+
297+
if not registrations:
298+
return Response({"error": "No registrations found for list"}, status=404)
299+
300+
# Find the specific registration
301+
reg = next((r for r in registrations if r.get("registrant_id") == registrant_id), None)
302+
303+
if not reg:
304+
return Response({"error": "Registration not found"}, status=404)
305+
306+
# Create accounts
307+
Account.objects.get_or_create(id=reg["registrant_id"])
308+
Account.objects.get_or_create(id=reg.get("registered_by", reg["registrant_id"]))
309+
310+
# Create/update registration (id is AutoField, use list+registrant as unique key)
311+
registration, created = ListRegistration.objects.update_or_create(
312+
list=list_obj,
313+
registrant_id=reg["registrant_id"],
314+
defaults={
315+
"registered_by_id": reg.get("registered_by", reg["registrant_id"]),
316+
"status": reg.get("status", "Pending"),
317+
"submitted_at": datetime.fromtimestamp(reg["submitted_ms"] / 1000),
318+
"updated_at": datetime.fromtimestamp(reg["updated_ms"] / 1000),
319+
"admin_notes": reg.get("admin_notes"),
320+
"registrant_notes": reg.get("registrant_notes"),
321+
}
322+
)
323+
324+
return Response({
325+
"success": True,
326+
"message": "Registration synced",
327+
"registrant_id": registrant_id,
328+
"status": registration.status
329+
})
330+
331+
except Exception as e:
332+
logger.error(f"Error syncing registration: {e}")
333+
return Response({"error": str(e)}, status=502)
334+
335+
336+
class AccountSyncAPI(APIView):
337+
"""
338+
Sync account data and recalculate donation stats.
339+
340+
Called by frontend after actions that affect an account's totals.
341+
Fetches NEAR Social profile and recalculates donation totals from DB.
342+
"""
343+
344+
@extend_schema(
345+
summary="Sync account and recalculate stats",
346+
responses={
347+
200: OpenApiResponse(description="Account synced"),
348+
502: OpenApiResponse(description="RPC failed"),
349+
}
350+
)
351+
def post(self, request, account_id: str):
352+
from django.db.models import Sum, Count
353+
354+
try:
355+
account, created = Account.objects.get_or_create(id=account_id)
356+
357+
# Fetch NEAR Social profile
358+
try:
359+
profile_data = fetch_from_rpc(
360+
"get",
361+
{"keys": [f"{account_id}/profile/**"]},
362+
contract_id="social.near" if settings.ENVIRONMENT != "testnet" else "v1.social08.testnet"
363+
)
364+
if profile_data and account_id in profile_data:
365+
account.near_social_profile_data = profile_data[account_id].get("profile", {})
366+
except Exception as e:
367+
logger.warning(f"Failed to fetch social profile for {account_id}: {e}")
368+
369+
# Recalculate donation totals from existing DB records
370+
donations_received = Donation.objects.filter(recipient_id=account_id).aggregate(
371+
total_usd=Sum('total_amount_usd'),
372+
count=Count('id')
373+
)
374+
donations_sent = Donation.objects.filter(donor_id=account_id).aggregate(
375+
total_usd=Sum('total_amount_usd'),
376+
count=Count('id')
377+
)
378+
379+
account.total_donations_in_usd = donations_received['total_usd'] or 0
380+
account.total_donations_out_usd = donations_sent['total_usd'] or 0
381+
account.donors_count = Donation.objects.filter(
382+
recipient_id=account_id
383+
).values('donor').distinct().count()
384+
385+
account.save()
386+
387+
return Response({
388+
"success": True,
389+
"message": "Account synced",
390+
"account_id": account_id,
391+
"created": created,
392+
"total_donations_in_usd": str(account.total_donations_in_usd),
393+
"total_donations_out_usd": str(account.total_donations_out_usd),
394+
"donors_count": account.donors_count
395+
})
396+
397+
except Exception as e:
398+
logger.error(f"Error syncing account {account_id}: {e}")
399+
return Response({"error": str(e)}, status=502)

0 commit comments

Comments
 (0)