diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py index 04fc2ed90..4e244f499 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py @@ -1,12 +1,13 @@ import argparse import logging -from typing import Optional +from typing import Optional, Literal from uuid import UUID from moonstreamdb.blockchain import AvailableBlockchainType from web3 import Web3 from web3.middleware import geth_poa_middleware +from .db import deduplicate_records from ..db import yield_db_session_ctx from ..settings import ( MOONSTREAM_MOONWORM_TASKS_JOURNAL, @@ -341,6 +342,21 @@ def handle_historical_crawl(args: argparse.Namespace) -> None: ) +def handle_deduplicate(args: argparse.Namespace) -> None: + """ + Deduplicate database records + """ + + with yield_db_session_ctx() as db_session: + deduplicate_records( + db_session, + args.blockchain_type, + args.table, + args.label, + args.type, + ) + + def main() -> None: parser = argparse.ArgumentParser() parser.set_defaults(func=lambda _: parser.print_help()) @@ -536,6 +552,49 @@ def main() -> None: ) historical_crawl_parser.set_defaults(func=handle_historical_crawl) + database_cli = subparsers.add_parser("database", help="Database operations") + database_cli.add_argument( + "--blockchain-type", + "-b", + type=str, + help=f"Available blockchain types: {[member.value for member in AvailableBlockchainType]}", + ) + + database_cli.set_defaults(func=lambda _: database_cli.print_help()) + + database_cli_subparsers = database_cli.add_subparsers() + + deduplicate_parser = database_cli_subparsers.add_parser( + "deduplicate", + help="Deduplicate database records", + ) + + deduplicate_parser.add_argument( + "--table", + "-t", + type=str, + choices=["blocks", "labels", "transactions"], + required=True, + help="Table type to deduplicate", + ) + + deduplicate_parser.add_argument( + "--label", + "-l", + type=str, + required=False, + help="Label to deduplicate", + ) + + deduplicate_parser.add_argument( + "--type", + "-y", + type=str, + choices=["event", "function"], + required=True, + help="Type to deduplicate", + ) + args = parser.parse_args() args.func(args) diff --git a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py index 26fd64ef0..fba110eed 100644 --- a/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py +++ b/crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py @@ -1,11 +1,13 @@ import logging import json -from typing import Dict, List, Optional +from typing import Dict, List, Optional, Literal from moonstreamdb.blockchain import AvailableBlockchainType, get_label_model from moonstreamdb.models import Base from moonworm.crawler.function_call_crawler import ContractFunctionCall # type: ignore from sqlalchemy.orm import Session +from sqlalchemy.sql import text + from ..settings import CRAWLER_LABEL from .event_crawler import Event @@ -218,3 +220,141 @@ def add_function_calls_to_session( logger.info(f"Saving {len(labels_to_save)} labels to session") db_session.add_all(labels_to_save) + + +def deduplicate_records( + db_session: Session, + blockchain_type: AvailableBlockchainType, + table: Literal["labels", "transactions", "blocks"], + label_name: Optional[str] = None, + label_type: Optional[str] = None, +) -> None: + """ + Deduplicates records in the database. + label name and label type work only for labels table. + """ + + if table == "blocks": + raise NotImplementedError("Deduplication for blocks is not implemented yet") + + if table == "labels": + label_model = get_label_model(blockchain_type) + + if label_name is None or label_type is None: + raise ValueError( + "label_name and label_type are required for deduplication of labels" + ) + + if label_type == "event": + # get list of all label_type addresses + + all_addresses = ( + db_session.query(label_model.address.label("address")) + .filter(label_model.label == label_name) + .filter(label_model.label_data["type"] == "event") + .distinct() + .all() + ) # can take a while + + for address_raw in all_addresses: + address = address_raw[0] + + deduplicate_records = db_session.execute( + text( + """ + WITH lates_labels AS ( + SELECT + DISTINCT ON (transaction_hash, log_index) transaction_hash, log_index, + block_number as block_number, + created_at as created_at + FROM + {} + WHERE + label=:label + AND address=:address + AND label_data->>'type' = :label_type + ORDER BY + transaction_hash ASC, + log_index ASC, + block_number ASC, + created_at ASC + ) + DELETE FROM + {} USING lates_labels + WHERE + label=:label + AND address=:address + AND label_data->>'type' = :label_type + AND {}.id not in (select id from lates_labels ) RETURNING {}.block_number; + """.format( + table, table, table, table + ) + ), + {"address": address, "label": label_name, "label_type": label_type}, + ) + + db_session.commit() + + logger.info( + f"Deleted {deduplicate_records} duplicate labels for address {address}" + ) + + if label_type == "tx_call": + # get list of all label_type addresses + + all_addresses = ( + db_session.query(label_model.address.label("address")) + .filter(label_model.label == label_name) + .filter(label_model.label_data["type"] == "tx_call") + .distinct() + .all() + ) + + for address_raw in all_addresses: + address = address_raw[0] + + deduplicate_records = db_session.execute( + text( + """ + WITH lates_labels AS ( + SELECT + DISTINCT ON (transaction_hash) transaction_hash, + block_number as block_number, + created_at as created_at + FROM + {} + WHERE + label=:label + AND address=:address + AND label_data->>'type' = :label_type + AND log_index is null + ORDER BY + transaction_hash ASC, + block_number ASC, + created_at ASC + ) + DELETE FROM + {} USING lates_labels + WHERE + label=:label + AND address=:address + AND label_data->>'type' = :label_type + AND log_index is null + AND {}.id not in (select id from lates_labels ) RETURNING {}.block_number; + """.format( + table, table, table, table + ) + ), + {"address": address, "label": label_name, "label_type": label_type}, + ) + + db_session.commit() + + logger.info( + f"Deleted {deduplicate_records} duplicate labels for address {address}" + ) + + if table == "transactions": + raise NotImplementedError( + "Deduplication for transactions is not implemented yet" + )