Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions datalake/airflow/dags/datalake_daily_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def datalake_daily_sync():
datalake_exporters_prefix = Variable.get("DATALAKE_ATHENA_DATALAKE_EXPORTERS_PREFIX")
datalake_athena_temp_bucket = Variable.get("DATALAKE_TMP_LOCATION")
env_tag = Variable.get("DATALAKE_TARGET_DATABASE")
is_testnet_mode = True if int(Variable.get("TESTNET_MODE", "0")) else False

def safe_python_callable(func, kwargs, step_name):
try:
Expand Down Expand Up @@ -219,6 +220,9 @@ def convert_table(kwargs):
waited = 0
MAX_WAIT_TIME = 7200
while True:
if is_testnet_mode:
logging.info("Kafka check skipped in TESTNET_MODE")
break
group_id = kwargs['kafka_group_id']
field = kwargs.get('topics_timestamp_field', kwargs['repartition_field'])
min_timestamp = check_kafka_consumer_state(group_id=group_id, field=field, allow_empty=allow_empty_partitions)
Expand Down Expand Up @@ -338,6 +342,10 @@ def execute_athena_query(query, database=source_database):
Checks Kafka commited offset for the consumer group
"""
def check_kafka_offset(kwargs):
if kwargs.get('skip_in_testnet_mode'):
logging.info("Task skipped in TESTNET_MODE")
return

task_instance = kwargs['task_instance']
group_id = kwargs['kafka_group_id']
field = kwargs['field']
Expand Down Expand Up @@ -458,7 +466,8 @@ def check_kafka_offset(kwargs):
op_kwargs={
'kafka_group_id': 'core_prices',
'topic': 'ton.public.latest_account_states',
'field': 'timestamp'
'field': 'timestamp',
'skip_in_testnet_mode': is_testnet_mode,
}
)

Expand All @@ -468,7 +477,8 @@ def check_kafka_offset(kwargs):
op_kwargs={
'kafka_group_id': 'dex_tvl_parsing',
'topic': 'ton.public.latest_account_states',
'field': 'timestamp'
'field': 'timestamp',
'skip_in_testnet_mode': is_testnet_mode,
}
)

Expand All @@ -478,7 +488,8 @@ def check_kafka_offset(kwargs):
op_kwargs={
'kafka_group_id': 'jettons_megaton',
'topic': 'ton.public.jetton_transfers',
'field': 'tx_now'
'field': 'tx_now',
'skip_in_testnet_mode': is_testnet_mode,
}
)

Expand Down
5 changes: 4 additions & 1 deletion parser/model/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ class Parser:

IGNORE_MISSING_PARENT_MESSAGE_BODY = int(os.environ.get("IGNORE_MISSING_PARENT_MESSAGE_BODY", '0')) == 1

TESTNET_MODE = int(os.environ.get("TESTNET_MODE", '0')) == 1

"""
To be invoked before starting parser with the DB instance
"""
Expand Down Expand Up @@ -123,7 +125,8 @@ def get_account_state_safe(clz, address: Address, db: DB):

logger.info(f"Fetching account state from toncenter RPC for {address}")

res = requests.get(f"https://toncenter.com/api/v3/accountStates?address={address.to_str(is_user_friendly=False)}")
toncenter_base_url = "https://testnet.toncenter.com" if Parser.TESTNET_MODE else "https://toncenter.com"
res = requests.get(f"{toncenter_base_url}/api/v3/accountStates?address={address.to_str(is_user_friendly=False)}")
if res.status_code != 200:
raise Exception(f"Failed to fetch account state from toncenter RPC for {address}")

Expand Down
49 changes: 35 additions & 14 deletions parser/parsers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
from typing import Dict, List, Set
from parsers.accounts.nfts_parser import NFTItemsParser
from parsers.accounts.nfts_parser import NFTItemsParser, TestnetNFTItemsParser
from parsers.accounts.staking_pools import StakingPoolsParser
from parsers.message.tonco import TONCOSwap
from parsers.message.tonco import TONCOSwap, TestnetTONCOSwap
from parsers.jetton_transfer.megaton import MegatonDexSwap
from parsers.message.tonfun import TonFunTrade
from parsers.jetton_masters.jetton_metadata import JettonMastersMetadataParser
from parsers.message.stonfi_swap_v2 import StonfiSwapV2
from parsers.jetton_masters.jetton_metadata import JettonMastersMetadataParser, TestnetJettonMastersMetadataParser
from parsers.message.stonfi_swap_v2 import StonfiSwapV2, TestnetStonfiSwapV2
from parsers.message.gaspump import GasPumpTrade
from parsers.accounts.tvl import TVLPoolStateParser
from parsers.accounts.jetton_wallets_recover import JettonWalletsRecover
from parsers.accounts.nfts_recover import NFTsRecover
from parsers.message_contents.decode_comment import CommentsDecoder
from parsers.accounts.core_prices import CorePricesHipoTON, CorePricesLSDstTON, CorePricesLSDtsTON, CorePricesStormTrade, CorePricesUSDT
from parsers.message.dedust_swap import DedustSwap
from parsers.message.stonfi_swap import StonfiSwap
from parsers.message.jetton_mint import JettonMintParser, HipoTokensMinted
from parsers.message.dedust_swap import DedustSwap, TestnetDedustSwap
from parsers.message.stonfi_swap import StonfiSwap, TestnetStonfiSwap
from parsers.message.jetton_mint import JettonMintParser, HipoTokensMinted, TestnetHipoTokensMinted
from parsers.nft_transfer.nft_history import NftHistoryParser
from parsers.nft_items.nft_item_metadata import NFTItemMetadataParser
from parsers.nft_collections.nft_collection_metadata import NFTCollectionMetadataParser
from parsers.nft_items.nft_item_metadata import NFTItemMetadataParser, TestnetNFTItemMetadataParser
from parsers.nft_collections.nft_collection_metadata import NFTCollectionMetadataParser, TestnetNFTCollectionMetadataParser
from parsers.message.memeslab import MemesLabTrade
from model.parser import Parser
from loguru import logger
Expand All @@ -27,9 +27,10 @@
METADATA_FETCH_TIMEOUT = int(os.environ.get("METADATA_FETCH_TIMEOUT", "10"))
METADATA_FETCH_MAX_ATTEMPTS = int(os.environ.get("METADATA_FETCH_MAX_ATTEMPTS", "3"))
TONAPI_ONLY_MODE = os.environ.get("TONAPI_ONLY_MODE", "0").lower() in ('true', '1')
TESTNET_MODE = int(os.environ.get("TESTNET_MODE", "0"))

_parsers = [
NftHistoryParser(),
_mainnet_parsers = [
NftHistoryParser(), # depricated!

# DEX trades
DedustSwap(EMULATOR_PATH),
Expand Down Expand Up @@ -59,18 +60,38 @@
TVLPoolStateParser(EMULATOR_PATH),
StakingPoolsParser(EMULATOR_PATH),

NFTsRecover(EMULATOR_PATH),
JettonWalletsRecover(EMULATOR_PATH),
NFTsRecover(EMULATOR_PATH), # depricated!
JettonWalletsRecover(EMULATOR_PATH), # depricated?
NFTItemsParser(EMULATOR_PATH),

CommentsDecoder(),
CommentsDecoder(), # depricated!

JettonMastersMetadataParser(METADATA_FETCH_TIMEOUT, METADATA_FETCH_MAX_ATTEMPTS),

NFTItemMetadataParser(METADATA_FETCH_TIMEOUT, METADATA_FETCH_MAX_ATTEMPTS, TONAPI_ONLY_MODE),
NFTCollectionMetadataParser(METADATA_FETCH_TIMEOUT, METADATA_FETCH_MAX_ATTEMPTS, TONAPI_ONLY_MODE)
]

_testnet_parsers = [
TestnetDedustSwap(EMULATOR_PATH),
TestnetStonfiSwap(),
TestnetStonfiSwapV2(),
TestnetTONCOSwap(),

JettonMintParser(),
TestnetHipoTokensMinted(),

TVLPoolStateParser(EMULATOR_PATH),

TestnetNFTItemsParser(EMULATOR_PATH), # TODO find testnet addresses

TestnetJettonMastersMetadataParser(METADATA_FETCH_TIMEOUT, METADATA_FETCH_MAX_ATTEMPTS),
TestnetNFTItemMetadataParser(METADATA_FETCH_TIMEOUT, METADATA_FETCH_MAX_ATTEMPTS, TONAPI_ONLY_MODE),
TestnetNFTCollectionMetadataParser(METADATA_FETCH_TIMEOUT, METADATA_FETCH_MAX_ATTEMPTS, TONAPI_ONLY_MODE)
]

_parsers = _testnet_parsers if TESTNET_MODE else _mainnet_parsers

"""
dict of parsers, where key is the topic name
"""
Expand Down
6 changes: 5 additions & 1 deletion parser/parsers/accounts/emulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@

def create_lite_client():
config_path = os.getenv("LITECLIENT_CONFIG_OVERRIDE", None)
is_testnet_mode = int(os.getenv("TESTNET_MODE", "0"))
if config_path is None:
return LiteClient.from_mainnet_config(ls_i=0, trust_level=2, timeout=30)
if is_testnet_mode:
return LiteClient.from_testnet_config(ls_i=0, trust_level=2, timeout=30)
else:
return LiteClient.from_mainnet_config(ls_i=0, trust_level=2, timeout=30)
else:
logger.info(f"Using liteclient config from {config_path}")
with open(config_path) as src:
Expand Down
29 changes: 18 additions & 11 deletions parser/parsers/accounts/nfts_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,6 @@
from pytvm.tvm_emulator.tvm_emulator import TvmEmulator
from parsers.accounts.emulator import EmulatorException, EmulatorParser

TON_DNS = Address('0:B774D95EB20543F186C06B371AB88AD704F7E256130CAF96189368A7D0CB6CCF')
TELEMINT_COLLECTIONS = [
Address('EQAOQdwdw8kGftJCSFgOErM1mBjYPe4DBPq8-AhF6vr9si5N'), # Anonymous Telegram Numbers
Address('EQCA14o1-VWhS2efqoh_9M1b_A9DtKTuoqfmkn83AbJzwnPi') # Telegram Usernames
]
TELEMINT_CODE_HASHES = [
'i3EeaAlnLAM13GHR2h3MQFnJcySK9we9WGJFCF6nCzI=', # Telegram gifts
'MNzX1bb89ZaMJ5Hh4xrXMvJnLymG4daXDrA8yiLSMlE=', # Telegram gifts
]
KEY_DOMAIN = 'domain'
KEY_MAX_BID_ADDRESS = 'max_bid_address'
KEY_MAX_BID_AMOUNT = 'max_bid_amount'
Expand All @@ -28,6 +19,16 @@
Reimplementation of public.nft_items table from ton-index-worker.
"""
class NFTItemsParser(EmulatorParser):
TON_DNS = Address('0:B774D95EB20543F186C06B371AB88AD704F7E256130CAF96189368A7D0CB6CCF')
TELEMINT_COLLECTIONS = [
Address('EQAOQdwdw8kGftJCSFgOErM1mBjYPe4DBPq8-AhF6vr9si5N'), # Anonymous Telegram Numbers
Address('EQCA14o1-VWhS2efqoh_9M1b_A9DtKTuoqfmkn83AbJzwnPi') # Telegram Usernames
]
TELEMINT_CODE_HASHES = [
'i3EeaAlnLAM13GHR2h3MQFnJcySK9we9WGJFCF6nCzI=', # Telegram gifts
'MNzX1bb89ZaMJ5Hh4xrXMvJnLymG4daXDrA8yiLSMlE=', # Telegram gifts
]

def __init__(self, emulator_path):
super().__init__(emulator_path)
self.code_hash_blacklist = set()
Expand Down Expand Up @@ -165,7 +166,7 @@ def _do_parse(self, obj, db: DB, emulator: TvmEmulator):
if original_address != nft_address:
logger.warning(f"NFT address mismatch: {original_address} != {nft_address}")
return
if collection_address == TON_DNS:
if collection_address == self.TON_DNS:
domain, = self._execute_method(emulator, 'get_domain', [], db, obj)
max_bid_address, max_bid_amount, auction_end_time = self._execute_method(emulator, 'get_auction_info', [], db, obj)
content = {
Expand All @@ -180,7 +181,7 @@ def _do_parse(self, obj, db: DB, emulator: TvmEmulator):
except Exception as e:
logger.warning(f"Failed to get nft content: {e}")
content = {}
if collection_address in TELEMINT_COLLECTIONS or obj['code_hash'] in TELEMINT_CODE_HASHES:
if collection_address in self.TELEMINT_COLLECTIONS or obj['code_hash'] in self.TELEMINT_CODE_HASHES:
try:
bidder_address, bid, bid_ts, min_bid, end_time = self._execute_method(emulator, 'get_telemint_auction_state', [], db, obj)
additional_content['bidder_address'] = bidder_address.load_address().to_str(0).upper()
Expand Down Expand Up @@ -215,3 +216,9 @@ def _do_parse(self, obj, db: DB, emulator: TvmEmulator):

logger.info(f"New NFT discovered: {nft_address}: {index} {collection_address} {owner_address} {obj['last_trans_lt']} {content}")
db.insert_nft_item_v2(nft_address, index, collection_address, owner_address, obj['last_trans_lt'], obj['timestamp'], init != 0, content)


class TestnetNFTItemsParser(NFTItemsParser):
TON_DNS = Address('0:E33ED33A42EB2032059F97D90C706F8400BB256D32139CA707F1564AD699C7DD')
TELEMINT_COLLECTIONS = []
TELEMINT_CODE_HASHES = []
9 changes: 7 additions & 2 deletions parser/parsers/jetton_masters/jetton_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
The parser extracts on-chain and off-chainmetadata from jetton masters and stores it in the database.
"""
class JettonMastersMetadataParser(Parser):
TONAPI_BASE_URL = "https://tonapi.io/v2/jettons"

def __init__(self, timeout: int = 10, max_attempts: int = 3):
self.timeout = timeout
Expand Down Expand Up @@ -186,7 +187,7 @@ def update_metadata(index, obj, key, prev, source=METADATA_OFFCHAIN):
logger.error(f"Error updating offchain metadata for {address}: {e}")
try:
logger.info(f"Trying to get metadata from TonAPI for {address}")
tonapi_response = requests.get(f"https://tonapi.io/v2/jettons/{address}", timeout=self.timeout, headers={
tonapi_response = requests.get(f"{self.TONAPI_BASE_URL}/{address}", timeout=self.timeout, headers={
"User-Agent": DATALAKE_USER_AGENT,
"Authorization": 'Bearer %s' % os.getenv("TONAPI_API_KEY")
})
Expand All @@ -209,7 +210,7 @@ def update_metadata(index, obj, key, prev, source=METADATA_OFFCHAIN):
if metadata.tonapi_image_url is None:
logger.info(f"Updating tonapi image url for {address}")
try:
tonapi_response = requests.get(f"https://tonapi.io/v2/jettons/{address}", timeout=self.timeout, headers={
tonapi_response = requests.get(f"{self.TONAPI_BASE_URL}/{address}", timeout=self.timeout, headers={
"User-Agent": DATALAKE_USER_AGENT,
"Authorization": 'Bearer %s' % os.getenv("TONAPI_API_KEY")
})
Expand Down Expand Up @@ -249,3 +250,7 @@ def update_metadata(index, obj, key, prev, source=METADATA_OFFCHAIN):
if onchain_updated or offchain_updated:
logger.info(f"Upserting jetton metadata for {address}")
db.upsert_jetton_metadata(metadata, prev_ts_onchain, prev_ts_offchain)


class TestnetJettonMastersMetadataParser(JettonMastersMetadataParser):
TONAPI_BASE_URL = "https://testnet.tonapi.io/v2/jettons"
26 changes: 15 additions & 11 deletions parser/parsers/message/dedust_swap.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,23 @@
from pytvm.tvm_emulator.tvm_emulator import TvmEmulator


# twin non-stable pools to avoid wrong prices estimation
# TODO - move to a special table?
BLACKLIST = set([
Parser.uf2raw('EQA0a6c40n_Kejx_Wj0vowdeYCFYG9XnLdLMRHihXc27cng5'),
Parser.uf2raw('EQDpuDAY31FH2jM9PysFsmJ3aXMMReGYb_P65aDOXVYDcCJX')
])

DEDUST_FACTORY_ADDRESS = Address('EQBfBWT7X2BHg9tXAxzhz2aKiNTU1tpt5NsiK0uSDW_YAJ67')

class DedustSwap(EmulatorParser):
DEDUST_FACTORY_ADDRESS = Address('EQBfBWT7X2BHg9tXAxzhz2aKiNTU1tpt5NsiK0uSDW_YAJ67')

# twin non-stable pools to avoid wrong prices estimation
# TODO - move to a special table?
BLACKLIST = set([
Parser.uf2raw('EQA0a6c40n_Kejx_Wj0vowdeYCFYG9XnLdLMRHihXc27cng5'),
Parser.uf2raw('EQDpuDAY31FH2jM9PysFsmJ3aXMMReGYb_P65aDOXVYDcCJX')
])

def __init__(self, emulator_path):
EmulatorParser.__init__(self, emulator_path)
self.valid_pools = set()

def prepare(self, db: DB):
EmulatorParser.prepare(self, db)
factory_state = Parser.get_account_state_safe(DEDUST_FACTORY_ADDRESS, db)
factory_state = Parser.get_account_state_safe(self.DEDUST_FACTORY_ADDRESS, db)
self.factory = self._prepare_emulator(factory_state)


Expand All @@ -38,7 +37,7 @@ def predicate(self, obj) -> bool:
return obj.get("opcode", None) == Parser.opcode_signed(0x9c610de3) and \
obj.get("direction", None) == "out" and \
obj.get("destination", 'None') is None and \
not obj.get("source", None) in BLACKLIST
not obj.get("source", None) in self.BLACKLIST

"""
We need to validate that the message produce by valid pool
Expand Down Expand Up @@ -101,3 +100,8 @@ def handle_internal(self, obj, db: DB):
estimate_volume(swap, db)
db.serialize(swap)
db.discover_dex_pool(swap)


class TestnetDedustSwap(DedustSwap):
DEDUST_FACTORY_ADDRESS = Address('kQBfBWT7X2BHg9tXAxzhz2aKiNTU1tpt5NsiK0uSDW_YACUx')
BLACKLIST = set()
12 changes: 8 additions & 4 deletions parser/parsers/message/jetton_mint.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
from pytoniq_core import Address


HTON_MASTER = Parser.uf2raw("EQDPdq8xjAhytYqfGSX8KcFWIReCufsB9Wdg0pLlYSO_h76w")

# Mint is not covered by TEP-74 and we should not be strict on the format
class WrongMintFormat(NonCriticalParserError):
pass
Expand Down Expand Up @@ -100,14 +98,16 @@ def handle_internal(self, obj: dict, db: DB):
= InternalMsgBody;
"""
class HipoTokensMinted(Parser):
HTON_MASTER = Parser.uf2raw("EQDPdq8xjAhytYqfGSX8KcFWIReCufsB9Wdg0pLlYSO_h76w")

def topics(self):
return [TOPIC_MESSAGES]

def predicate(self, obj: dict) -> bool:
return (
obj.get("opcode") == Parser.opcode_signed(0x5445efee)
and obj.get("direction") == "in"
and obj.get("source") == HTON_MASTER
and obj.get("source") == self.HTON_MASTER
)

def handle_internal(self, obj: dict, db: DB):
Expand All @@ -118,7 +118,7 @@ def handle_internal(self, obj: dict, db: DB):
amount = cell.load_coins() # tokens:Coins

wallet = db.get_jetton_wallet(Address(Parser.require(obj.get("destination"))))
assert wallet['jetton'] == HTON_MASTER
assert wallet['jetton'] == self.HTON_MASTER

tx = Parser.require(db.get_transaction(Parser.require(obj.get("tx_hash"))))

Expand All @@ -142,3 +142,7 @@ def handle_internal(self, obj: dict, db: DB):
)
logger.info(f"Adding Hipo tokens_minted event {mint}")
db.serialize(mint)


class TestnetHipoTokensMinted(HipoTokensMinted):
HTON_MASTER = Parser.uf2raw("kQB519C3IXFgCr4qKj6QrtaB9Pm3Sawr-Gonlo3O0cKL_I03")
8 changes: 6 additions & 2 deletions parser/parsers/message/stonfi_swap.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
from model.dexswap import DEX_STON, DexSwapParsed
from parsers.message.swap_volume import estimate_volume

STONFI_ROUTER = Parser.uf2raw('EQB3ncyBUTjZUA5EnFKR5_EnOMI9V1tTEAAPaiU71gc4TiUt')

class StonfiSwap(Parser):
STONFI_ROUTER = Parser.uf2raw('EQB3ncyBUTjZUA5EnFKR5_EnOMI9V1tTEAAPaiU71gc4TiUt')

def topics(self):
return [TOPIC_MESSAGES]
Expand All @@ -17,7 +17,7 @@ def predicate(self, obj) -> bool:
# only internal messages processed by the router
return obj.get("opcode", None) == Parser.opcode_signed(0xf93bb43f) and \
obj.get("direction", None) == "in" and \
obj.get("destination", None) == STONFI_ROUTER
obj.get("destination", None) == self.STONFI_ROUTER


def handle_internal(self, obj, db: DB):
Expand Down Expand Up @@ -116,3 +116,7 @@ def handle_internal(self, obj, db: DB):
estimate_volume(swap, db)
db.serialize(swap)
db.discover_dex_pool(swap)


class TestnetStonfiSwap(StonfiSwap):
STONFI_ROUTER = Parser.uf2raw('kQB3ncyBUTjZUA5EnFKR5_EnOMI9V1tTEAAPaiU71gc4Tp6n')
Loading