Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 56 additions & 2 deletions datalake/converters/dex_trades.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@
from dataclasses import dataclass, asdict
import decimal
from typing import List
from topics import TOPIC_DEX_SWAPS, TOPIC_GASPUMP_EVENTS, TOPIC_TONFUN, TOPIC_MEMESLAB
from topics import (
TOPIC_DEX_SWAPS,
TOPIC_GASPUMP_EVENTS,
TOPIC_TONFUN,
TOPIC_MEMESLAB,
TOPIC_URANUS,
)
from loguru import logger
from converters.converter import Converter

Expand All @@ -16,6 +22,7 @@
PROJECT_TONFUN = "ton.fun"
PROJECT_GASPUMP = "gaspump"
PROJECT_MEMESLAB = "memeslab"
PROJECT_URANUS = "uranus"

EVENT_TYPE_TRADE = "trade"
EVENT_TYPE_LAUNCH = "launch" # launch from bonding curve
Expand Down Expand Up @@ -66,7 +73,13 @@ def timestamp(self, obj):
return obj['event_time']

def topics(self) -> List[str]:
return [TOPIC_DEX_SWAPS, TOPIC_GASPUMP_EVENTS, TOPIC_TONFUN, TOPIC_MEMESLAB]
return [
TOPIC_DEX_SWAPS,
TOPIC_GASPUMP_EVENTS,
TOPIC_TONFUN,
TOPIC_MEMESLAB,
TOPIC_URANUS,
]

def convert(self, obj, table_name=None):
trades = []
Expand Down Expand Up @@ -211,6 +224,47 @@ def convert(self, obj, table_name=None):
volume_ton=int(self.decode_numeric(obj['ton_amount'])) / 1e9,
volume_usd=self.decode_numeric(obj['volume_usd']),
))
elif table_name == "uranus_trade":
# Uranus memepad trades
is_buy = obj['event_type'] == 'BuyEvent'
ton_amount_raw = obj['amount_in'] if is_buy else obj['amount_out']
common = {
'tx_hash': obj['tx_hash'],
'trace_id': obj['trace_id'],
'project_type': PLATFORM_TYPE_LAUNCHPAD,
'project': PROJECT_URANUS,
'version': 1,
'event_time': obj['event_time'],
'pool_address': obj['meme_master'],
'router_address': None,
'query_id': None,
'referral_address': None,
'platform_tag': None,
}
trades.append(Trade(
**common,
event_type=EVENT_TYPE_TRADE,
trader_address=obj['trader_address'],
token_sold_address=TON_NATIVE_ADDRESS if is_buy else obj['meme_master'],
token_bought_address=obj['meme_master'] if is_buy else TON_NATIVE_ADDRESS,
amount_sold_raw=self.decode_numeric(obj['amount_in'] if is_buy else obj['amount_out']),
amount_bought_raw=self.decode_numeric(obj['amount_out'] if is_buy else obj['amount_in']),
volume_ton=int(self.decode_numeric(ton_amount_raw)) / 1e9,
volume_usd=self.decode_numeric(obj['volume_usd']),
))
if is_buy:
if obj.get('is_graduated') is True:
trades.append(Trade(
**common,
event_type=EVENT_TYPE_LAUNCH,
trader_address=None,
token_sold_address=None,
token_bought_address=None,
amount_sold_raw=None,
amount_bought_raw=None,
volume_ton=None,
volume_usd=None,
))

for trade in trades:
if trade.volume_ton is not None:
Expand Down
1 change: 1 addition & 0 deletions datalake/topics.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
TOPIC_GASPUMP_EVENTS = "ton.parsed.gaspump_trade"
TOPIC_TONFUN = "ton.parsed.tonfun_bcl_trade"
TOPIC_MEMESLAB = "ton.parsed.memeslab_trade_event"
TOPIC_URANUS = "ton.parsed.uranus_trade"

TOPIC_DEX_POOLS = "ton.prices.dex_pool"
TOPIC_EXTRA_NFT_SALES = "ton.parsed.extra_nft_sales"
31 changes: 31 additions & 0 deletions parser/createdb.sql
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,30 @@ ALTER TABLE parsed.memeslab_trade_event DROP CONSTRAINT memeslab_trade_event_pke
ALTER TABLE parsed.memeslab_trade_event ADD PRIMARY KEY (tx_hash, event_type);
COMMIT;

-- Uranus memepad
CREATE TABLE IF NOT EXISTS parsed.uranus_trade (
tx_hash bpchar(44) NULL,
trace_id bpchar(44) NULL,
msg_hash bpchar(44) NULL,
event_time int4 NULL,
meme_master varchar NULL,
event_type varchar NULL,
trader_address varchar NULL,
amount_in numeric NULL,
amount_out numeric NULL,
creator_fee numeric NULL,
protocol_fee numeric NULL,
partner_fee numeric NULL,
referrer_fee numeric NULL,
current_supply numeric NULL,
raised_funds numeric NULL,
is_graduated bool NULL,
volume_usd numeric NULL,
created timestamp NULL,
updated timestamp NULL,
CONSTRAINT uranus_trade_pkey PRIMARY KEY (tx_hash, event_type)
);


-- Adding usd volume for memepads
ALTER TABLE parsed.gaspump_trade ADD column if not exists "volume_usd" numeric NULL;
Expand Down Expand Up @@ -266,6 +290,13 @@ EXCEPTION
WHEN duplicate_object THEN null;
END $$;

-- CPMM v3 DEX support
DO $$ BEGIN
ALTER TYPE public.dex_name ADD VALUE 'cpmm_pool_v3' AFTER 'dedust';
EXCEPTION
WHEN duplicate_object THEN null;
END $$;

-- Staking pools

CREATE TABLE IF NOT EXISTS parsed.staking_pools_nominators (
Expand Down
1 change: 1 addition & 0 deletions parser/model/dexswap.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
DEX_BIDASK_CLMM = "bidask_clmm"
DEX_BIDASK_DAMM = "bidask_damm"
DEX_MOON = "moon.cx"
DEX_DEDUST_CPMM_V3 = "cpmm_pool_v3"

@dataclass
class DexSwapParsed:
Expand Down
36 changes: 36 additions & 0 deletions parser/model/uranus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import decimal
from dataclasses import dataclass
from typing import Optional


@dataclass
class UranusTradeEvent:
__tablename__ = "uranus_trade"

tx_hash: str
trace_id: str
msg_hash: str
event_time: int
meme_master: str
event_type: str # BuyEvent, SellEvent
trader_address: str

# from BuyEvent & SellEvent
amount_in: decimal.Decimal
amount_out: decimal.Decimal

# from TradeFees
creator_fee: decimal.Decimal
protocol_fee: decimal.Decimal
partner_fee: decimal.Decimal
referrer_fee: decimal.Decimal

# from BuyEvent & SellEvent
current_supply: decimal.Decimal
raised_funds: decimal.Decimal

# from BuyEvent
is_graduated: Optional[bool]

# extra info
volume_usd: Optional[decimal.Decimal]
4 changes: 4 additions & 0 deletions parser/parsers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
from parsers.accounts.nft_sales import NFTSalesParser
from parsers.message.moon_swap_ton import MoonSwapTON
from parsers.jetton_transfer.moon_swap_jetton import MoonSwapJetton
from parsers.message.dedust_swap_cpmm_v3 import CPMMV3Swap
from parsers.message.uranus import UranusTrade
from model.parser import Parser
from loguru import logger
import os
Expand Down Expand Up @@ -56,6 +58,8 @@
BidaskDammSwap(EMULATOR_PATH),
MoonSwapTON(),
MoonSwapJetton(),
CPMMV3Swap(EMULATOR_PATH),
UranusTrade(),

CorePricesUSDT(),
CorePricesLSDstTON(),
Expand Down
7 changes: 5 additions & 2 deletions parser/parsers/accounts/tvl.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from db import DB
from pytoniq_core import Address
from model.dexpool import DexPool
from model.dexswap import DEX_DEDUST, DEX_MEGATON, DEX_STON, DEX_STON_V2, DEX_TONCO, DEX_COFFEE, DEX_BIDASK_CLMM, DEX_BIDASK_DAMM, DEX_MOON
from model.dexswap import DEX_DEDUST, DEX_MEGATON, DEX_STON, DEX_STON_V2, DEX_TONCO, DEX_COFFEE, DEX_BIDASK_CLMM, DEX_BIDASK_DAMM, DEX_MOON, DEX_DEDUST_CPMM_V3
from model.dedust import read_dedust_asset
from model.coffee import read_coffee_asset
from parsers.message.swap_volume import estimate_tvl
Expand Down Expand Up @@ -43,7 +43,7 @@ def _do_parse(self, obj, db: DB, emulator: TvmEmulator):
pool.last_updated = obj['timestamp']

# total supply is required for all cases except TONCO, Bidask DLMM
if pool.platform not in [DEX_TONCO, DEX_BIDASK_CLMM]:
if pool.platform not in [DEX_TONCO, DEX_BIDASK_CLMM, DEX_DEDUST_CPMM_V3]:
try:
pool.total_supply, _, _, _, _= self._execute_method(emulator, 'get_jetton_data', [], db, obj)
except EmulatorException as e:
Expand Down Expand Up @@ -210,6 +210,9 @@ def _do_parse(self, obj, db: DB, emulator: TvmEmulator):
pool.lp_fee = lp_fee / 1e4 if lp_fee is not None else None
pool.protocol_fee = protocol_fee / 1e4 if protocol_fee is not None else None
pool.referral_fee = ref_fee / 1e4 if ref_fee is not None else None
elif pool.platform == DEX_DEDUST_CPMM_V3:
logger.warning(f"CPMM v3 TVL parsing not implemented for pool {pool.pool}")
return
else:
raise Exception(f"DEX is not supported: {pool.platform}")

Expand Down
156 changes: 156 additions & 0 deletions parser/parsers/message/dedust_swap_cpmm_v3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
import base64
from typing import Optional

from db import DB
from loguru import logger
from model.dexswap import DEX_DEDUST_CPMM_V3, DexSwapParsed
from model.parser import TOPIC_MESSAGES, Parser
from parsers.accounts.emulator import EmulatorParser
from parsers.message.swap_volume import estimate_volume
from pytoniq_core import Address, Cell

TON_NATIVE_ADDRESS = (
"0:0000000000000000000000000000000000000000000000000000000000000000"
)


def _addr_or_zero(addr: Optional[Address]) -> str:
return addr.to_str(is_user_friendly=False).upper() if addr else TON_NATIVE_ADDRESS

class CPMMV3Swap(EmulatorParser):
"""Parses swaps emitted by CPMM v3 pools (default & uranus-linked)."""

POOL_CODE_HASHES = {
"OZelwe6JI+k886apjqPvlILGTdiuorqDZWieFNmcdg0=",
}

SWAP_EVENT_OPCODE = Parser.opcode_signed(0x78E79BA4)

def __init__(self, emulator_path: str):
super().__init__(emulator_path)
self.valid_pools: set[str] = set()

def topics(self):
return [TOPIC_MESSAGES]

def predicate(self, obj) -> bool:
return (
obj.get("opcode") == self.SWAP_EVENT_OPCODE
and obj.get("direction") == "out"
and obj.get("destination") is None
)

def prepare(self, db: DB):
super().prepare(db)

def _is_valid_pool(self, pool_address: Address, pool_state: dict) -> bool:
address_key = pool_address.to_str(is_user_friendly=False)
if address_key in self.valid_pools:
return True

if not pool_state or not pool_state.get("code_boc"):
return False

code_hash = base64.b64encode(
Cell.one_from_boc(pool_state["code_boc"])._hash
).decode("utf-8")

if code_hash in self.POOL_CODE_HASHES:
self.valid_pools.add(address_key)
return True

logger.warning(f"Invalid CPMM v3 pool code hash {code_hash} for {address_key}")
return False

def _parse_tokens(self, emulator, db: DB, pool_state: dict):
# get_pool_data returns tuple [status, deposit_active, swap_active, assetX, assetY, ...]
_, _, _, asset_x_cell, asset_y_cell, *_ = self._execute_method(
emulator, "get_pool_data", [], db, pool_state
)

def parse_asset(cell: Cell):
slice_ = cell.begin_parse() if hasattr(cell, "begin_parse") else cell
try:
return slice_.load_address(), True
except ValueError as exc:
logger.warning(f"Failed to parse pool asset cell: {exc}")
return None, False

asset_x, asset_x_ok = parse_asset(asset_x_cell)
asset_y, asset_y_ok = parse_asset(asset_y_cell)
if not asset_x_ok or not asset_y_ok:
return None, None, False

return asset_x, asset_y, True

def handle_internal(self, obj, db: DB):
tx_hash = Parser.require(obj.get("tx_hash"))
if not Parser.require(db.is_tx_successful(tx_hash)):
logger.info(f"Skipping failed tx for {tx_hash}")
return

pool_address = Address(Parser.require(obj.get("source")))
pool_state = Parser.get_account_state_safe(pool_address, db)
if not pool_state or not pool_state.get("code_boc"):
logger.warning(
f"Account state missing for {pool_address.to_str(is_user_friendly=False)}"
)
return
if not self._is_valid_pool(pool_address, pool_state):
return

cell = Parser.message_body(obj, db).begin_parse()
opcode_prefix = Parser.opcode_signed(cell.load_uint(32))
if opcode_prefix != self.SWAP_EVENT_OPCODE:
logger.debug(f"Unexpected opcode {opcode_prefix} for CPMM swap")
return

x_to_y = cell.load_bit()
amount_in = cell.load_coins()
amount_out = cell.load_coins()
if amount_in == 0 or amount_out == 0:
logger.info(f"Skipping zero amount swap for {tx_hash}")
return

initiator = cell.load_address()
cell.load_address()
cell.load_ref()
fees = cell.load_ref().begin_parse()
fees.load_bit()
lp_fee = fees.load_coins()
creator_fee = fees.load_coins()
protocol_fee = fees.load_coins()
partner_fee = fees.load_coins()
referrer_fee = fees.load_coins()

pool_emulator = self._prepare_emulator(pool_state)
asset_x, asset_y, assets_ok = self._parse_tokens(pool_emulator, db, pool_state)
if not assets_ok:
logger.warning(
f"Missing pool assets for {pool_address.to_str(is_user_friendly=False)}"
)
return
src_token = asset_x if x_to_y else asset_y
dst_token = asset_y if x_to_y else asset_x

swap = DexSwapParsed(
tx_hash=tx_hash,
msg_hash=Parser.require(obj.get("msg_hash")),
trace_id=Parser.require(obj.get("trace_id")),
platform=DEX_DEDUST_CPMM_V3,
swap_utime=Parser.require(obj.get("created_at")),
swap_user=initiator.to_str(is_user_friendly=False).upper()
if initiator
else None,
swap_pool=pool_address.to_str(is_user_friendly=False).upper(),
swap_src_token=_addr_or_zero(src_token),
swap_dst_token=_addr_or_zero(dst_token),
swap_src_amount=amount_in,
swap_dst_amount=amount_out,
referral_address=None,
query_id=None,
)

estimate_volume(swap, db)
db.serialize(swap)
db.discover_dex_pool(swap)
Loading