diff --git a/crawlers/mooncrawl/mooncrawl/actions.py b/crawlers/mooncrawl/mooncrawl/actions.py new file mode 100644 index 000000000..36807ae4c --- /dev/null +++ b/crawlers/mooncrawl/mooncrawl/actions.py @@ -0,0 +1,22 @@ +import logging +from typing import Any, Dict + +import boto3 # type: ignore + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +def push_data_to_bucket( + data: Any, key: str, bucket: str, metadata: Dict[str, Any] = {} +) -> None: + s3 = boto3.client("s3") + s3.put_object( + Body=data, + Bucket=bucket, + Key=key, + ContentType="application/json", + Metadata=metadata, + ) + + logger.info(f"Data pushed to bucket: s3://{bucket}/{key}") diff --git a/crawlers/mooncrawl/mooncrawl/api.py b/crawlers/mooncrawl/mooncrawl/api.py index ea0669bf5..047310c23 100644 --- a/crawlers/mooncrawl/mooncrawl/api.py +++ b/crawlers/mooncrawl/mooncrawl/api.py @@ -16,6 +16,7 @@ from sqlalchemy import text from . import data +from .queries_crawler.actions import QueryNotValid, query_validation from .middleware import MoonstreamHTTPException from .settings import ( BUGOUT_RESOURCE_TYPE_SUBSCRIPTION, @@ -202,8 +203,8 @@ async def queries_data_update_handler( ) try: - valid_query = queries.query_validation(request_data.query) - except queries.QueryNotValid: + valid_query = query_validation(request_data.query) + except QueryNotValid: logger.error(f"Incorrect query provided with id: {query_id}") raise MoonstreamHTTPException( status_code=401, detail="Incorrect query provided" diff --git a/crawlers/mooncrawl/mooncrawl/queries_crawler/__init__.py b/crawlers/mooncrawl/mooncrawl/queries_crawler/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/crawlers/mooncrawl/mooncrawl/queries_crawler/actions.py b/crawlers/mooncrawl/mooncrawl/queries_crawler/actions.py new file mode 100644 index 000000000..6d6ad9b3b --- /dev/null +++ b/crawlers/mooncrawl/mooncrawl/queries_crawler/actions.py @@ -0,0 +1,165 @@ +import csv +import json +import logging +import re +import uuid +from datetime import datetime, timezone +from enum import Enum +from io import StringIO +from typing import Any, Dict, List, Optional, Tuple + +from moonstreamdb.db import ( + MOONSTREAM_DB_URI_READ_ONLY, + MOONSTREAM_POOL_SIZE, + create_moonstream_engine, +) +from sqlalchemy.orm import sessionmaker +from sqlalchemy.sql import text + +from ..reporter import reporter +from ..settings import ( + BUGOUT_REQUEST_TIMEOUT_SECONDS, + MOONSTREAM_ADMIN_ACCESS_TOKEN, + MOONSTREAM_QUERIES_JOURNAL_ID, + MOONSTREAM_QUERY_API_DB_STATEMENT_TIMEOUT_MILLIS, + bugout_client, +) +from ..stats_worker.queries import to_json_types + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +QUERY_REGEX = re.compile("[\[\]@#$%^&?;`/]") + + +class OutputType(Enum): + NONE = None + CSV = "csv" + JSON = "json" + + +class QueryNotValid(Exception): + """ + Raised when query validation not passed. + """ + + +class QueryNotApproved(Exception): + """ + Raised when query not approved + """ + + +def query_validation(query: str) -> str: + """ + Sanitize provided query. + """ + if QUERY_REGEX.search(query) != None: + raise QueryNotValid("Query contains restricted symbols") + + return query + + +def fetch_query_from_journal(query_id: uuid.UUID, allow_not_approved: bool = False): + """ + Fetch query from moonstream-queries journal. + """ + query = bugout_client.get_entry( + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + journal_id=MOONSTREAM_QUERIES_JOURNAL_ID, + entry_id=query_id, + timeout=BUGOUT_REQUEST_TIMEOUT_SECONDS, + ) + + approved = False + for tag in query.tags: + if tag == "approved": + approved = True + if not approved and not allow_not_approved: + raise QueryNotApproved("Query not approved") + + return query + + +def fetch_data_from_db( + query_id: str, + query: str, + params: Optional[Dict[str, Any]] = None, +) -> Tuple[Any, Any]: + """ + Fetch data from moonstream database. + """ + engine = create_moonstream_engine( + MOONSTREAM_DB_URI_READ_ONLY, + pool_pre_ping=True, + pool_size=MOONSTREAM_POOL_SIZE, + statement_timeout=MOONSTREAM_QUERY_API_DB_STATEMENT_TIMEOUT_MILLIS, + ) + process_session = sessionmaker(bind=engine) + db_session = process_session() + + time_now = datetime.now(timezone.utc) + + try: + result = db_session.execute(text(query), params) + data_keys = result.keys() + data_rows = result.fetchall() + except Exception as err: + db_session.rollback() + reporter.error_report( + err, + [ + "queries", + "execution", + "db", + f"query_id:{query_id}", + ], + ) + finally: + db_session.close() + + exec_timedelta = datetime.now(timezone.utc) - time_now + logger.info( + f"Database query finished in {int(exec_timedelta.total_seconds())} seconds" + ) + + return data_keys, data_rows + + +def prepare_output( + output_type: OutputType, data_keys: Tuple[Any], data_rows: Tuple[List[Any]] +) -> str: + """ + Parse incoming data from database to proper format OutputType. + """ + + def prepare_dict( + data_temp_keys: Tuple[Any], data_temp_rows: Tuple[List[Any]] + ) -> List[Dict[str, Any]]: + output_raw = [] + for row in data_temp_rows: + data_r = {} + for i, k in enumerate(data_temp_keys): + data_r[k] = to_json_types(row[i]) + output_raw.append(data_r) + + return output_raw + + output: Any = None + if output_type.value == "csv": + csv_buffer = StringIO() + csv_writer = csv.writer(csv_buffer, delimiter=";") + + csv_writer.writerow(data_keys) + csv_writer.writerows(data_rows) + + output = csv_buffer.getvalue().encode("utf-8") + elif output_type.value == "json": + output_raw = prepare_dict(data_keys, data_rows) + output = json.dumps(output_raw).encode("utf-8") + elif output_type.value is None: + output = prepare_dict(data_keys, data_rows) + else: + raise Exception("Unsupported output type") + + return output diff --git a/crawlers/mooncrawl/mooncrawl/queries_crawler/cli.py b/crawlers/mooncrawl/mooncrawl/queries_crawler/cli.py new file mode 100644 index 000000000..0ebcfe7e8 --- /dev/null +++ b/crawlers/mooncrawl/mooncrawl/queries_crawler/cli.py @@ -0,0 +1,91 @@ +import argparse +import logging + +from moonstreamdb.blockchain import AvailableBlockchainType + +from ..actions import push_data_to_bucket +from ..settings import MOONSTREAM_S3_DATA_BUCKET, MOONSTREAM_S3_DATA_BUCKET_PREFIX +from .actions import ( + OutputType, + fetch_data_from_db, + fetch_query_from_journal, + prepare_output, + query_validation, +) + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +def parser_queries_execute_handler(args: argparse.Namespace) -> None: + """ + Execute query from moonstream-queries journal and push to bucket. + """ + try: + query = fetch_query_from_journal(query_id=args.id, allow_not_approved=False) + if args.id != str(query.id): + raise Exception("Proposed query id is not equal to fetch query (entry id)") + + query_content = query_validation(query.content) + data_keys, data_rows = fetch_data_from_db(query_id=args.id, query=query_content) + + output = prepare_output( + output_type=OutputType(args.output), + data_keys=data_keys, + data_rows=data_rows, + ) + + if args.output is not None: + if args.upload: + bucket_metadata = {"source": "queries-crawler"} + push_data_to_bucket( + data=output, + key=f"{MOONSTREAM_S3_DATA_BUCKET_PREFIX}/queries/{str(args.id)}/data.{args.output}", + bucket=MOONSTREAM_S3_DATA_BUCKET, + metadata=bucket_metadata, + ) + else: + print(output) + except Exception as e: + logger.error(e) + + +def main() -> None: + parser = argparse.ArgumentParser(description="Moonstream query crawlers CLI") + parser.set_defaults(func=lambda _: parser.print_help()) + subcommands = parser.add_subparsers(description="Query crawlers commands") + + parser_queries_execute = subcommands.add_parser( + "execute", description="Execute query" + ) + parser_queries_execute.add_argument( + "-i", + "--id", + required=True, + help="Query id (entry id in moonstream-queries journal)", + ) + parser_queries_execute.add_argument( + "-o", + "--output", + default=OutputType.NONE.value, + help=f"Available output types: {[member.value for member in OutputType]}", + ) + parser_queries_execute.add_argument( + "-u", + "--upload", + action="store_true", + help="Set this flag to push output into AWS S3 bucket", + ) + parser_queries_execute.add_argument( + "--blockchain", + required=True, + help=f"Available blockchain types: {[member.value for member in AvailableBlockchainType]}", + ) + parser_queries_execute.set_defaults(func=parser_queries_execute_handler) + + args = parser.parse_args() + args.func(args) + + +if __name__ == "__main__": + main() diff --git a/crawlers/mooncrawl/mooncrawl/stats_worker/test_queries.py b/crawlers/mooncrawl/mooncrawl/queries_crawler/test_validation.py similarity index 66% rename from crawlers/mooncrawl/mooncrawl/stats_worker/test_queries.py rename to crawlers/mooncrawl/mooncrawl/queries_crawler/test_validation.py index a71911d7e..f97715b9b 100644 --- a/crawlers/mooncrawl/mooncrawl/stats_worker/test_queries.py +++ b/crawlers/mooncrawl/mooncrawl/queries_crawler/test_validation.py @@ -1,15 +1,15 @@ import unittest -from . import queries +from .actions import QueryNotValid, query_validation class TestQueries(unittest.TestCase): def test_query_validation(self): q = "SELECT * FROM ethereum_blocks" - self.assertEqual(queries.query_validation(q), q) + self.assertEqual(query_validation(q), q) q = "select count(*), tx_dublicates from ( select count(*) as tx_dublicates from polygon_labels where address = '0x123' and label_data->>'name' = 'Transfer' group by transaction_hash, log_index order by tx_dublicates desc) as dublicates group by dublicates.tx_dublicates" - self.assertEqual(queries.query_validation(q), q) + self.assertEqual(query_validation(q), q) q = """ Select difference.address, @@ -41,25 +41,25 @@ def test_query_validation(self): ) difference order by count desc """ - self.assertEqual(queries.query_validation(q), q) + self.assertEqual(query_validation(q), q) - with self.assertRaises(queries.QueryNotValid): - queries.query_validation("SELECT hash FROM ethereum_transaction;") + with self.assertRaises(QueryNotValid): + query_validation("SELECT hash FROM ethereum_transaction;") - with self.assertRaises(queries.QueryNotValid): - queries.query_validation("%20UNION") + with self.assertRaises(QueryNotValid): + query_validation("%20UNION") - with self.assertRaises(queries.QueryNotValid): - queries.query_validation("?id=1") + with self.assertRaises(QueryNotValid): + query_validation("?id=1") - with self.assertRaises(queries.QueryNotValid): - queries.query_validation("FROM`") + with self.assertRaises(QueryNotValid): + query_validation("FROM`") - with self.assertRaises(queries.QueryNotValid): - queries.query_validation("WHERE login='[USER]'") + with self.assertRaises(QueryNotValid): + query_validation("WHERE login='[USER]'") - with self.assertRaises(queries.QueryNotValid): - queries.query_validation("OR(1=1)#") + with self.assertRaises(QueryNotValid): + query_validation("OR(1=1)#") - with self.assertRaises(queries.QueryNotValid): - queries.query_validation("/etc/hosts") + with self.assertRaises(QueryNotValid): + query_validation("/etc/hosts") diff --git a/crawlers/mooncrawl/mooncrawl/settings.py b/crawlers/mooncrawl/mooncrawl/settings.py index 1cb640733..c9dc61470 100644 --- a/crawlers/mooncrawl/mooncrawl/settings.py +++ b/crawlers/mooncrawl/mooncrawl/settings.py @@ -127,7 +127,10 @@ "MOONSTREAM_MOONWORM_TASKS_JOURNAL environment variable must be set" ) -# queries +# Queries +MOONSTREAM_QUERIES_JOURNAL_ID = os.environ.get("MOONSTREAM_QUERIES_JOURNAL_ID", "") +if MOONSTREAM_QUERIES_JOURNAL_ID == "": + raise ValueError("MOONSTREAM_QUERIES_JOURNAL_ID env variable is not set") MOONSTREAM_QUERY_API_DB_STATEMENT_TIMEOUT_MILLIS = 30000 MOONSTREAM_QUERY_API_DB_STATEMENT_TIMEOUT_MILLIS_RAW = os.environ.get( @@ -147,8 +150,6 @@ MOONSTREAM_S3_QUERIES_BUCKET = os.environ.get("MOONSTREAM_S3_QUERIES_BUCKET", "") if MOONSTREAM_S3_QUERIES_BUCKET == "": raise ValueError("MOONSTREAM_S3_QUERIES_BUCKET environment variable must be set") - - MOONSTREAM_S3_QUERIES_BUCKET_PREFIX = os.environ.get( "MOONSTREAM_S3_QUERIES_BUCKET_PREFIX", "" ) @@ -157,6 +158,17 @@ "MOONSTREAM_S3_QUERIES_BUCKET_PREFIX environment variable must be set" ) +MOONSTREAM_S3_DATA_BUCKET = os.environ.get("MOONSTREAM_S3_DATA_BUCKET", "") +if MOONSTREAM_S3_DATA_BUCKET == "": + raise ValueError("MOONSTREAM_S3_DATA_BUCKET environment variable must be set") +MOONSTREAM_S3_DATA_BUCKET_PREFIX = os.environ.get( + "MOONSTREAM_S3_DATA_BUCKET_PREFIX", "" +) +if MOONSTREAM_S3_DATA_BUCKET_PREFIX == "": + raise ValueError( + "MOONSTREAM_S3_DATA_BUCKET_PREFIX environment variable must be set" + ) + # Node balancer NB_ACCESS_ID_HEADER = os.environ.get("NB_ACCESS_ID_HEADER", "x-node-balancer-access-id") NB_DATA_SOURCE_HEADER = os.environ.get( diff --git a/crawlers/mooncrawl/mooncrawl/stats_worker/queries.py b/crawlers/mooncrawl/mooncrawl/stats_worker/queries.py index 8ea20099c..8e2dacf88 100644 --- a/crawlers/mooncrawl/mooncrawl/stats_worker/queries.py +++ b/crawlers/mooncrawl/mooncrawl/stats_worker/queries.py @@ -1,61 +1,31 @@ import csv import json import logging -import re from io import StringIO from typing import Any, Dict, Optional -import boto3 # type: ignore from moonstreamdb.db import ( - create_moonstream_engine, MOONSTREAM_DB_URI_READ_ONLY, MOONSTREAM_POOL_SIZE, + create_moonstream_engine, ) from sqlalchemy.orm import sessionmaker -from ..reporter import reporter +from ..actions import push_data_to_bucket +from ..reporter import reporter from ..settings import ( - MOONSTREAM_S3_QUERIES_BUCKET_PREFIX, MOONSTREAM_QUERY_API_DB_STATEMENT_TIMEOUT_MILLIS, + MOONSTREAM_S3_QUERIES_BUCKET_PREFIX, ) logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) -QUERY_REGEX = re.compile("[\[\]@#$%^&?;`/]") - -class QueryNotValid(Exception): - """ - Raised when query validation not passed. - """ - - -def push_statistics(s3: Any, data: Any, key: str, bucket: str) -> None: - - s3.put_object( - Body=data, - Bucket=bucket, - Key=key, - ContentType="application/json", - Metadata={"drone_query": "data"}, - ) - - logger.info(f"Statistics push to bucket: s3://{bucket}/{key}") - - -def query_validation(query: str) -> str: +def to_json_types(value): """ - Sanitize provided query. + Validate types from database to json types. """ - if QUERY_REGEX.search(query) != None: - raise QueryNotValid("Query contains restricted symbols") - - return query - - -def to_json_types(value): - if isinstance(value, (str, int, tuple, list, dict)): return value elif isinstance(value, set): @@ -74,8 +44,6 @@ def data_generate( """ Generate query and push it to S3 """ - s3 = boto3.client("s3") - # Create session engine = create_moonstream_engine( MOONSTREAM_DB_URI_READ_ONLY, @@ -86,6 +54,8 @@ def data_generate( process_session = sessionmaker(bind=engine) db_session = process_session() + bucket_metadata = {"drone_query": "data"} + try: if file_type == "csv": csv_buffer = StringIO() @@ -97,11 +67,11 @@ def data_generate( csv_writer.writerow(result.keys()) csv_writer.writerows(result.fetchAll()) - push_statistics( - s3=s3, + push_data_to_bucket( data=csv_buffer.getvalue().encode("utf-8"), key=f"queries/{query_id}/data.{file_type}", bucket=bucket, + metadata=bucket_metadata, ) else: block_number, block_timestamp = db_session.execute( @@ -118,11 +88,12 @@ def data_generate( ], } ).encode("utf-8") - push_statistics( - s3=s3, + + push_data_to_bucket( data=data, key=f"{MOONSTREAM_S3_QUERIES_BUCKET_PREFIX}/queries/{query_id}/data.{file_type}", bucket=bucket, + metadata=bucket_metadata, ) except Exception as err: db_session.rollback() diff --git a/crawlers/mooncrawl/mooncrawl/version.py b/crawlers/mooncrawl/mooncrawl/version.py index 99d4f3305..b1322bcb3 100644 --- a/crawlers/mooncrawl/mooncrawl/version.py +++ b/crawlers/mooncrawl/mooncrawl/version.py @@ -2,4 +2,4 @@ Moonstream crawlers version. """ -MOONCRAWL_VERSION = "0.2.4" +MOONCRAWL_VERSION = "0.2.5" diff --git a/crawlers/mooncrawl/sample.env b/crawlers/mooncrawl/sample.env index 23ee4108d..d2f9feac9 100644 --- a/crawlers/mooncrawl/sample.env +++ b/crawlers/mooncrawl/sample.env @@ -13,6 +13,7 @@ export MOONSTREAM_HUMBUG_TOKEN="" export MOONSTREAM_DATA_JOURNAL_ID="" export MOONSTREAM_MOONWORM_TASKS_JOURNAL="" export MOONSTREAM_ADMIN_ACCESS_TOKEN="" +export MOONSTREAM_QUERIES_JOURNAL_ID="" export NFT_HUMBUG_TOKEN="" # Blockchain nodes environment variables @@ -27,6 +28,8 @@ export MOONSTREAM_S3_SMARTCONTRACTS_BUCKET="" export MOONSTREAM_S3_SMARTCONTRACTS_ABI_PREFIX="" export MOONSTREAM_S3_QUERIES_BUCKET="" export MOONSTREAM_S3_QUERIES_BUCKET_PREFIX="dev" +export MOONSTREAM_S3_DATA_BUCKET="" +export MOONSTREAM_S3_DATA_BUCKET_PREFIX="dev" # 3rd parties environment variables export MOONSTREAM_ETHERSCAN_TOKEN="" diff --git a/crawlers/mooncrawl/setup.py b/crawlers/mooncrawl/setup.py index b6efe31f7..0ef4b1ac3 100644 --- a/crawlers/mooncrawl/setup.py +++ b/crawlers/mooncrawl/setup.py @@ -64,6 +64,7 @@ "statistics=mooncrawl.stats_worker.dashboard:main", "state-crawler=mooncrawl.state_crawler.cli:main", "metadata-crawler=mooncrawl.metadata_crawler.cli:main", + "queries-crawler=mooncrawl.queries_crawler.cli:main", ] }, )