diff --git a/datalake/converters/dex_trades.py b/datalake/converters/dex_trades.py index 1adcf16..8ee66b2 100644 --- a/datalake/converters/dex_trades.py +++ b/datalake/converters/dex_trades.py @@ -2,7 +2,13 @@ from dataclasses import dataclass, asdict import decimal from typing import List -from topics import TOPIC_DEX_SWAPS, TOPIC_GASPUMP_EVENTS, TOPIC_TONFUN, TOPIC_MEMESLAB +from topics import ( + TOPIC_DEX_SWAPS, + TOPIC_GASPUMP_EVENTS, + TOPIC_TONFUN, + TOPIC_MEMESLAB, + TOPIC_URANUS, +) from loguru import logger from converters.converter import Converter @@ -16,6 +22,7 @@ PROJECT_TONFUN = "ton.fun" PROJECT_GASPUMP = "gaspump" PROJECT_MEMESLAB = "memeslab" +PROJECT_URANUS = "uranus" EVENT_TYPE_TRADE = "trade" EVENT_TYPE_LAUNCH = "launch" # launch from bonding curve @@ -66,7 +73,13 @@ def timestamp(self, obj): return obj['event_time'] def topics(self) -> List[str]: - return [TOPIC_DEX_SWAPS, TOPIC_GASPUMP_EVENTS, TOPIC_TONFUN, TOPIC_MEMESLAB] + return [ + TOPIC_DEX_SWAPS, + TOPIC_GASPUMP_EVENTS, + TOPIC_TONFUN, + TOPIC_MEMESLAB, + TOPIC_URANUS, + ] def convert(self, obj, table_name=None): trades = [] @@ -211,6 +224,47 @@ def convert(self, obj, table_name=None): volume_ton=int(self.decode_numeric(obj['ton_amount'])) / 1e9, volume_usd=self.decode_numeric(obj['volume_usd']), )) + elif table_name == "uranus_trade": + # Uranus memepad trades + is_buy = obj['event_type'] == 'BuyEvent' + ton_amount_raw = obj['amount_in'] if is_buy else obj['amount_out'] + common = { + 'tx_hash': obj['tx_hash'], + 'trace_id': obj['trace_id'], + 'project_type': PLATFORM_TYPE_LAUNCHPAD, + 'project': PROJECT_URANUS, + 'version': 1, + 'event_time': obj['event_time'], + 'pool_address': obj['meme_master'], + 'router_address': None, + 'query_id': None, + 'referral_address': None, + 'platform_tag': None, + } + trades.append(Trade( + **common, + event_type=EVENT_TYPE_TRADE, + trader_address=obj['trader_address'], + token_sold_address=TON_NATIVE_ADDRESS if is_buy else obj['meme_master'], + token_bought_address=obj['meme_master'] if is_buy else TON_NATIVE_ADDRESS, + amount_sold_raw=self.decode_numeric(obj['amount_in'] if is_buy else obj['amount_out']), + amount_bought_raw=self.decode_numeric(obj['amount_out'] if is_buy else obj['amount_in']), + volume_ton=int(self.decode_numeric(ton_amount_raw)) / 1e9, + volume_usd=self.decode_numeric(obj['volume_usd']), + )) + if is_buy: + if obj.get('is_graduated') is True: + trades.append(Trade( + **common, + event_type=EVENT_TYPE_LAUNCH, + trader_address=None, + token_sold_address=None, + token_bought_address=None, + amount_sold_raw=None, + amount_bought_raw=None, + volume_ton=None, + volume_usd=None, + )) for trade in trades: if trade.volume_ton is not None: diff --git a/datalake/topics.py b/datalake/topics.py index c7a073e..3cc6dd5 100644 --- a/datalake/topics.py +++ b/datalake/topics.py @@ -18,6 +18,7 @@ TOPIC_GASPUMP_EVENTS = "ton.parsed.gaspump_trade" TOPIC_TONFUN = "ton.parsed.tonfun_bcl_trade" TOPIC_MEMESLAB = "ton.parsed.memeslab_trade_event" +TOPIC_URANUS = "ton.parsed.uranus_trade" TOPIC_DEX_POOLS = "ton.prices.dex_pool" TOPIC_EXTRA_NFT_SALES = "ton.parsed.extra_nft_sales" diff --git a/parser/createdb.sql b/parser/createdb.sql index c2eac07..c62bae5 100644 --- a/parser/createdb.sql +++ b/parser/createdb.sql @@ -190,6 +190,30 @@ ALTER TABLE parsed.memeslab_trade_event DROP CONSTRAINT memeslab_trade_event_pke ALTER TABLE parsed.memeslab_trade_event ADD PRIMARY KEY (tx_hash, event_type); COMMIT; +-- Uranus memepad +CREATE TABLE IF NOT EXISTS parsed.uranus_trade ( + tx_hash bpchar(44) NULL, + trace_id bpchar(44) NULL, + msg_hash bpchar(44) NULL, + event_time int4 NULL, + meme_master varchar NULL, + event_type varchar NULL, + trader_address varchar NULL, + amount_in numeric NULL, + amount_out numeric NULL, + creator_fee numeric NULL, + protocol_fee numeric NULL, + partner_fee numeric NULL, + referrer_fee numeric NULL, + current_supply numeric NULL, + raised_funds numeric NULL, + is_graduated bool NULL, + volume_usd numeric NULL, + created timestamp NULL, + updated timestamp NULL, + CONSTRAINT uranus_trade_pkey PRIMARY KEY (tx_hash, event_type) +); + -- Adding usd volume for memepads ALTER TABLE parsed.gaspump_trade ADD column if not exists "volume_usd" numeric NULL; @@ -266,6 +290,13 @@ EXCEPTION WHEN duplicate_object THEN null; END $$; +-- CPMM v3 DEX support +DO $$ BEGIN + ALTER TYPE public.dex_name ADD VALUE 'cpmm_pool_v3' AFTER 'dedust'; +EXCEPTION + WHEN duplicate_object THEN null; +END $$; + -- Staking pools CREATE TABLE IF NOT EXISTS parsed.staking_pools_nominators ( diff --git a/parser/model/dexswap.py b/parser/model/dexswap.py index a700e85..489b4e5 100644 --- a/parser/model/dexswap.py +++ b/parser/model/dexswap.py @@ -35,6 +35,7 @@ DEX_BIDASK_CLMM = "bidask_clmm" DEX_BIDASK_DAMM = "bidask_damm" DEX_MOON = "moon.cx" +DEX_DEDUST_CPMM_V3 = "cpmm_pool_v3" @dataclass class DexSwapParsed: diff --git a/parser/model/uranus.py b/parser/model/uranus.py new file mode 100644 index 0000000..292edc6 --- /dev/null +++ b/parser/model/uranus.py @@ -0,0 +1,36 @@ +import decimal +from dataclasses import dataclass +from typing import Optional + + +@dataclass +class UranusTradeEvent: + __tablename__ = "uranus_trade" + + tx_hash: str + trace_id: str + msg_hash: str + event_time: int + meme_master: str + event_type: str # BuyEvent, SellEvent + trader_address: str + + # from BuyEvent & SellEvent + amount_in: decimal.Decimal + amount_out: decimal.Decimal + + # from TradeFees + creator_fee: decimal.Decimal + protocol_fee: decimal.Decimal + partner_fee: decimal.Decimal + referrer_fee: decimal.Decimal + + # from BuyEvent & SellEvent + current_supply: decimal.Decimal + raised_funds: decimal.Decimal + + # from BuyEvent + is_graduated: Optional[bool] + + # extra info + volume_usd: Optional[decimal.Decimal] diff --git a/parser/parsers/__init__.py b/parser/parsers/__init__.py index 6f05c5a..223a8ce 100644 --- a/parser/parsers/__init__.py +++ b/parser/parsers/__init__.py @@ -26,6 +26,8 @@ from parsers.accounts.nft_sales import NFTSalesParser from parsers.message.moon_swap_ton import MoonSwapTON from parsers.jetton_transfer.moon_swap_jetton import MoonSwapJetton +from parsers.message.dedust_swap_cpmm_v3 import CPMMV3Swap +from parsers.message.uranus import UranusTrade from model.parser import Parser from loguru import logger import os @@ -56,6 +58,8 @@ BidaskDammSwap(EMULATOR_PATH), MoonSwapTON(), MoonSwapJetton(), + CPMMV3Swap(EMULATOR_PATH), + UranusTrade(), CorePricesUSDT(), CorePricesLSDstTON(), diff --git a/parser/parsers/accounts/tvl.py b/parser/parsers/accounts/tvl.py index 1460ba8..c7bf825 100644 --- a/parser/parsers/accounts/tvl.py +++ b/parser/parsers/accounts/tvl.py @@ -5,7 +5,7 @@ from db import DB from pytoniq_core import Address from model.dexpool import DexPool -from model.dexswap import DEX_DEDUST, DEX_MEGATON, DEX_STON, DEX_STON_V2, DEX_TONCO, DEX_COFFEE, DEX_BIDASK_CLMM, DEX_BIDASK_DAMM, DEX_MOON +from model.dexswap import DEX_DEDUST, DEX_MEGATON, DEX_STON, DEX_STON_V2, DEX_TONCO, DEX_COFFEE, DEX_BIDASK_CLMM, DEX_BIDASK_DAMM, DEX_MOON, DEX_DEDUST_CPMM_V3 from model.dedust import read_dedust_asset from model.coffee import read_coffee_asset from parsers.message.swap_volume import estimate_tvl @@ -43,7 +43,7 @@ def _do_parse(self, obj, db: DB, emulator: TvmEmulator): pool.last_updated = obj['timestamp'] # total supply is required for all cases except TONCO, Bidask DLMM - if pool.platform not in [DEX_TONCO, DEX_BIDASK_CLMM]: + if pool.platform not in [DEX_TONCO, DEX_BIDASK_CLMM, DEX_DEDUST_CPMM_V3]: try: pool.total_supply, _, _, _, _= self._execute_method(emulator, 'get_jetton_data', [], db, obj) except EmulatorException as e: @@ -210,6 +210,9 @@ def _do_parse(self, obj, db: DB, emulator: TvmEmulator): pool.lp_fee = lp_fee / 1e4 if lp_fee is not None else None pool.protocol_fee = protocol_fee / 1e4 if protocol_fee is not None else None pool.referral_fee = ref_fee / 1e4 if ref_fee is not None else None + elif pool.platform == DEX_DEDUST_CPMM_V3: + logger.warning(f"CPMM v3 TVL parsing not implemented for pool {pool.pool}") + return else: raise Exception(f"DEX is not supported: {pool.platform}") diff --git a/parser/parsers/message/dedust_swap_cpmm_v3.py b/parser/parsers/message/dedust_swap_cpmm_v3.py new file mode 100644 index 0000000..3ec2de5 --- /dev/null +++ b/parser/parsers/message/dedust_swap_cpmm_v3.py @@ -0,0 +1,156 @@ +import base64 +from typing import Optional + +from db import DB +from loguru import logger +from model.dexswap import DEX_DEDUST_CPMM_V3, DexSwapParsed +from model.parser import TOPIC_MESSAGES, Parser +from parsers.accounts.emulator import EmulatorParser +from parsers.message.swap_volume import estimate_volume +from pytoniq_core import Address, Cell + +TON_NATIVE_ADDRESS = ( + "0:0000000000000000000000000000000000000000000000000000000000000000" +) + + +def _addr_or_zero(addr: Optional[Address]) -> str: + return addr.to_str(is_user_friendly=False).upper() if addr else TON_NATIVE_ADDRESS + +class CPMMV3Swap(EmulatorParser): + """Parses swaps emitted by CPMM v3 pools (default & uranus-linked).""" + + POOL_CODE_HASHES = { + "OZelwe6JI+k886apjqPvlILGTdiuorqDZWieFNmcdg0=", + } + + SWAP_EVENT_OPCODE = Parser.opcode_signed(0x78E79BA4) + + def __init__(self, emulator_path: str): + super().__init__(emulator_path) + self.valid_pools: set[str] = set() + + def topics(self): + return [TOPIC_MESSAGES] + + def predicate(self, obj) -> bool: + return ( + obj.get("opcode") == self.SWAP_EVENT_OPCODE + and obj.get("direction") == "out" + and obj.get("destination") is None + ) + + def prepare(self, db: DB): + super().prepare(db) + + def _is_valid_pool(self, pool_address: Address, pool_state: dict) -> bool: + address_key = pool_address.to_str(is_user_friendly=False) + if address_key in self.valid_pools: + return True + + if not pool_state or not pool_state.get("code_boc"): + return False + + code_hash = base64.b64encode( + Cell.one_from_boc(pool_state["code_boc"])._hash + ).decode("utf-8") + + if code_hash in self.POOL_CODE_HASHES: + self.valid_pools.add(address_key) + return True + + logger.warning(f"Invalid CPMM v3 pool code hash {code_hash} for {address_key}") + return False + + def _parse_tokens(self, emulator, db: DB, pool_state: dict): + # get_pool_data returns tuple [status, deposit_active, swap_active, assetX, assetY, ...] + _, _, _, asset_x_cell, asset_y_cell, *_ = self._execute_method( + emulator, "get_pool_data", [], db, pool_state + ) + + def parse_asset(cell: Cell): + slice_ = cell.begin_parse() if hasattr(cell, "begin_parse") else cell + try: + return slice_.load_address(), True + except ValueError as exc: + logger.warning(f"Failed to parse pool asset cell: {exc}") + return None, False + + asset_x, asset_x_ok = parse_asset(asset_x_cell) + asset_y, asset_y_ok = parse_asset(asset_y_cell) + if not asset_x_ok or not asset_y_ok: + return None, None, False + + return asset_x, asset_y, True + + def handle_internal(self, obj, db: DB): + tx_hash = Parser.require(obj.get("tx_hash")) + if not Parser.require(db.is_tx_successful(tx_hash)): + logger.info(f"Skipping failed tx for {tx_hash}") + return + + pool_address = Address(Parser.require(obj.get("source"))) + pool_state = Parser.get_account_state_safe(pool_address, db) + if not pool_state or not pool_state.get("code_boc"): + logger.warning( + f"Account state missing for {pool_address.to_str(is_user_friendly=False)}" + ) + return + if not self._is_valid_pool(pool_address, pool_state): + return + + cell = Parser.message_body(obj, db).begin_parse() + opcode_prefix = Parser.opcode_signed(cell.load_uint(32)) + if opcode_prefix != self.SWAP_EVENT_OPCODE: + logger.debug(f"Unexpected opcode {opcode_prefix} for CPMM swap") + return + + x_to_y = cell.load_bit() + amount_in = cell.load_coins() + amount_out = cell.load_coins() + if amount_in == 0 or amount_out == 0: + logger.info(f"Skipping zero amount swap for {tx_hash}") + return + + initiator = cell.load_address() + cell.load_address() + cell.load_ref() + fees = cell.load_ref().begin_parse() + fees.load_bit() + lp_fee = fees.load_coins() + creator_fee = fees.load_coins() + protocol_fee = fees.load_coins() + partner_fee = fees.load_coins() + referrer_fee = fees.load_coins() + + pool_emulator = self._prepare_emulator(pool_state) + asset_x, asset_y, assets_ok = self._parse_tokens(pool_emulator, db, pool_state) + if not assets_ok: + logger.warning( + f"Missing pool assets for {pool_address.to_str(is_user_friendly=False)}" + ) + return + src_token = asset_x if x_to_y else asset_y + dst_token = asset_y if x_to_y else asset_x + + swap = DexSwapParsed( + tx_hash=tx_hash, + msg_hash=Parser.require(obj.get("msg_hash")), + trace_id=Parser.require(obj.get("trace_id")), + platform=DEX_DEDUST_CPMM_V3, + swap_utime=Parser.require(obj.get("created_at")), + swap_user=initiator.to_str(is_user_friendly=False).upper() + if initiator + else None, + swap_pool=pool_address.to_str(is_user_friendly=False).upper(), + swap_src_token=_addr_or_zero(src_token), + swap_dst_token=_addr_or_zero(dst_token), + swap_src_amount=amount_in, + swap_dst_amount=amount_out, + referral_address=None, + query_id=None, + ) + + estimate_volume(swap, db) + db.serialize(swap) + db.discover_dex_pool(swap) diff --git a/parser/parsers/message/uranus.py b/parser/parsers/message/uranus.py new file mode 100644 index 0000000..aa09275 --- /dev/null +++ b/parser/parsers/message/uranus.py @@ -0,0 +1,148 @@ +import base64 +import traceback + +from db import DB +from loguru import logger +from model.parser import TOPIC_MESSAGES, NonCriticalParserError, Parser +from model.uranus import UranusTradeEvent +from parsers.message.swap_volume import USDT +from pytoniq_core import Address, Cell + +"""Uranus bonding curve parser.""" + +URANUS_MASTER_CODE_HASHES = { + "ci03vlGO4NS2cUB3cn7WByS/N56PFHnCILhT0a888A0=", +} + +BUY_EVENT_OPCODE = Parser.opcode_signed(0xA0AA6BC2) +SELL_EVENT_OPCODE = Parser.opcode_signed(0x3AB0FCCC) +EVENTS = [BUY_EVENT_OPCODE, SELL_EVENT_OPCODE] +EVENT_TYPE_BY_OPCODE = { + BUY_EVENT_OPCODE: "BuyEvent", + SELL_EVENT_OPCODE: "SellEvent", +} + + +class UranusTrade(Parser): + def __init__(self): + self._graduated_seen: set[str] = set() + + def topics(self): + return [TOPIC_MESSAGES] + + def predicate(self, obj) -> bool: + return ( + obj.get("opcode", None) in EVENTS + and obj.get("direction", None) == "out" + and obj.get("destination") is None + ) + + def handle_internal(self, obj, db: DB): + try: + tx_hash = Parser.require(obj.get("tx_hash", None)) + trace_id = Parser.require(obj.get("trace_id", None)) + msg_hash = Parser.require(obj.get("msg_hash", None)) + created_at = Parser.require(obj.get("created_at", None)) + source_addr_raw = Parser.require(obj.get("source", None)) + opcode = Parser.require(obj.get("opcode", None)) + + event_type = EVENT_TYPE_BY_OPCODE.get(opcode) + if event_type is None: + logger.warning(f"Unknown uranus event opcode: {opcode}") + return + is_buy_event = opcode == BUY_EVENT_OPCODE + + source_addr = Address(source_addr_raw) + account_state = Parser.get_account_state_safe(source_addr, db) + if not account_state or not account_state.get("code_boc"): + logger.warning( + f"Account state for {source_addr_raw} missing, skip uranus trade" + ) + return + + code_hash_b64 = base64.b64encode( + Cell.one_from_boc(account_state["code_boc"])._hash + ).decode("utf-8") + if code_hash_b64 not in URANUS_MASTER_CODE_HASHES: + logger.warning( + f"Code hash {code_hash_b64} for {source_addr_raw} not in uranus master whitelist" + ) + return + meme_master_raw = source_addr.to_str(is_user_friendly=False).upper() + + cell = Parser.message_body(obj, db).begin_parse() + opcode_raw = cell.load_uint(32) + body_opcode = Parser.opcode_signed(opcode_raw) + if body_opcode != opcode: + logger.warning( + f"Uranus opcode mismatch: header {opcode} vs body {body_opcode}" + ) + return + + trader_address = cell.load_address() + amount_in = cell.load_coins() + amount_out = cell.load_coins() + + if amount_in == 0 or amount_out == 0: + logger.debug(f"Skipping zero amount uranus trade for {tx_hash}") + return + + creator_fee = cell.load_coins() + protocol_fee = cell.load_coins() + partner_fee = cell.load_coins() + referrer_fee = cell.load_coins() + + current_supply = cell.load_coins() + raised_funds = cell.load_coins() + + is_graduated = None + if is_buy_event: + is_graduated = bool(cell.load_bit()) + if is_graduated and not self._is_first_graduation(meme_master_raw): + is_graduated = False + + ton_price = db.get_core_price(USDT, created_at) + if ton_price is None: + logger.warning(f"No TON price found for {created_at}") + ton_price = 0 + + ton_amount = amount_in if is_buy_event else amount_out + volume_usd = ton_amount * ton_price / 1e6 + + event = UranusTradeEvent( + tx_hash=tx_hash, + trace_id=trace_id, + msg_hash=msg_hash, + event_time=created_at, + meme_master=meme_master_raw, + event_type=event_type, + trader_address=trader_address.to_str(is_user_friendly=False).upper() + if trader_address + else None, + amount_in=amount_in, + amount_out=amount_out, + creator_fee=creator_fee, + protocol_fee=protocol_fee, + partner_fee=partner_fee, + referrer_fee=referrer_fee, + current_supply=current_supply, + raised_funds=raised_funds, + is_graduated=is_graduated, + volume_usd=volume_usd, + ) + + db.serialize(event) + except Exception as e: + logger.error( + f"Failed to parse uranus trade event: {e} {traceback.format_exc()}" + ) + raise NonCriticalParserError( + f"Failed to parse uranus trade event: {e} {traceback.format_exc()}" + ) from e + + def _is_first_graduation(self, meme_master_raw: str) -> bool: + if meme_master_raw in self._graduated_seen: + return False + + self._graduated_seen.add(meme_master_raw) + return True