|
| 1 | +// Copyright © Aptos Foundation |
| 2 | + |
| 3 | +use crate::{ |
| 4 | + utils::{ |
| 5 | + counters::{ |
| 6 | + GOT_CONNECTION_COUNT, PARSER_FAIL_COUNT, PARSER_INVOCATIONS_COUNT, |
| 7 | + PUBSUB_ACK_SUCCESS_COUNT, SKIP_URI_COUNT, UNABLE_TO_GET_CONNECTION_COUNT, |
| 8 | + }, |
| 9 | + database::{check_or_update_chain_id, establish_connection_pool, run_migrations}, |
| 10 | + }, |
| 11 | + worker::Worker, |
| 12 | +}; |
| 13 | +use aptos_indexer_grpc_server_framework::RunnableConfig; |
| 14 | +use bytes::Bytes; |
| 15 | +use diesel::{ |
| 16 | + r2d2::{ConnectionManager, Pool}, |
| 17 | + PgConnection, |
| 18 | +}; |
| 19 | +use google_cloud_storage::client::{Client as GCSClient, ClientConfig as GCSClientConfig}; |
| 20 | +use serde::{Deserialize, Serialize}; |
| 21 | +use std::sync::Arc; |
| 22 | +use tracing::{error, info, warn}; |
| 23 | +use warp::Filter; |
| 24 | + |
| 25 | +/// Structs to hold config from YAML |
| 26 | +#[derive(Clone, Debug, Deserialize, Serialize)] |
| 27 | +#[serde(deny_unknown_fields)] |
| 28 | +pub struct ParserConfig { |
| 29 | + pub google_application_credentials: Option<String>, |
| 30 | + pub bucket: String, |
| 31 | + pub database_url: String, |
| 32 | + pub cdn_prefix: String, |
| 33 | + pub ipfs_prefix: String, |
| 34 | + pub ipfs_auth_key: Option<String>, |
| 35 | + pub max_file_size_bytes: Option<u32>, |
| 36 | + pub image_quality: Option<u8>, // Quality up to 100 |
| 37 | + pub max_image_dimensions: Option<u32>, |
| 38 | + pub ack_parsed_uris: Option<bool>, |
| 39 | + pub uri_blacklist: Option<Vec<String>>, |
| 40 | + pub server_port: u16, |
| 41 | +} |
| 42 | + |
| 43 | +#[async_trait::async_trait] |
| 44 | +impl RunnableConfig for ParserConfig { |
| 45 | + /// Main driver function that establishes a connection to Pubsub and parses the Pubsub entries in parallel |
| 46 | + async fn run(&self) -> anyhow::Result<()> { |
| 47 | + info!( |
| 48 | + "[NFT Metadata Crawler] Starting parser with config: {:?}", |
| 49 | + self |
| 50 | + ); |
| 51 | + |
| 52 | + info!("[NFT Metadata Crawler] Connecting to database"); |
| 53 | + let pool = establish_connection_pool(self.database_url.clone()); |
| 54 | + info!("[NFT Metadata Crawler] Database connection successful"); |
| 55 | + |
| 56 | + info!("[NFT Metadata Crawler] Running migrations"); |
| 57 | + run_migrations(&pool); |
| 58 | + info!("[NFT Metadata Crawler] Finished migrations"); |
| 59 | + |
| 60 | + if let Some(google_application_credentials) = self.google_application_credentials.clone() { |
| 61 | + std::env::set_var( |
| 62 | + "GOOGLE_APPLICATION_CREDENTIALS", |
| 63 | + google_application_credentials, |
| 64 | + ); |
| 65 | + } |
| 66 | + |
| 67 | + // Establish GCS client |
| 68 | + let gcs_config = GCSClientConfig::default() |
| 69 | + .with_auth() |
| 70 | + .await |
| 71 | + .unwrap_or_else(|e| { |
| 72 | + error!( |
| 73 | + error = ?e, |
| 74 | + "[NFT Metadata Crawler] Failed to create gRPC client config" |
| 75 | + ); |
| 76 | + panic!(); |
| 77 | + }); |
| 78 | + |
| 79 | + // Create request context |
| 80 | + let context = Arc::new(ServerContext { |
| 81 | + parser_config: self.clone(), |
| 82 | + pool, |
| 83 | + gcs_client: GCSClient::new(gcs_config), |
| 84 | + }); |
| 85 | + |
| 86 | + // Create web server |
| 87 | + let route = warp::post() |
| 88 | + .and(warp::path::end()) |
| 89 | + .and(warp::body::bytes()) |
| 90 | + .and(warp::any().map(move || context.clone())) |
| 91 | + .and_then(handle_root); |
| 92 | + warp::serve(route) |
| 93 | + .run(([0, 0, 0, 0], self.server_port)) |
| 94 | + .await; |
| 95 | + Ok(()) |
| 96 | + } |
| 97 | + |
| 98 | + fn get_server_name(&self) -> String { |
| 99 | + "parser".to_string() |
| 100 | + } |
| 101 | +} |
| 102 | + |
| 103 | +/// Struct to hold context required for parsing |
| 104 | +#[derive(Clone)] |
| 105 | +pub struct ServerContext { |
| 106 | + pub parser_config: ParserConfig, |
| 107 | + pub pool: Pool<ConnectionManager<PgConnection>>, |
| 108 | + pub gcs_client: GCSClient, |
| 109 | +} |
| 110 | + |
| 111 | +/// Repeatedly pulls workers from Channel and perform parsing operations |
| 112 | +async fn spawn_parser( |
| 113 | + parser_config: ParserConfig, |
| 114 | + msg_base64: Bytes, |
| 115 | + pool: Pool<ConnectionManager<PgConnection>>, |
| 116 | + gcs_client: GCSClient, |
| 117 | +) { |
| 118 | + PARSER_INVOCATIONS_COUNT.inc(); |
| 119 | + let pubsub_message = String::from_utf8(msg_base64.to_vec()).unwrap_or_else(|e| { |
| 120 | + error!( |
| 121 | + error = ?e, |
| 122 | + "[NFT Metadata Crawler] Failed to parse PubSub message" |
| 123 | + ); |
| 124 | + panic!(); |
| 125 | + }); |
| 126 | + |
| 127 | + info!( |
| 128 | + pubsub_message = pubsub_message, |
| 129 | + "[NFT Metadata Crawler] Received message from PubSub" |
| 130 | + ); |
| 131 | + |
| 132 | + // Skips message if it does not have 5 commas (likely malformed URI) |
| 133 | + if pubsub_message.matches(',').count() != 5 { |
| 134 | + // Sends ack to PubSub only if ack_parsed_uris flag is true |
| 135 | + info!("[NFT Metadata Crawler] More than 5 commas, skipping message"); |
| 136 | + SKIP_URI_COUNT.with_label_values(&["invalid"]).inc(); |
| 137 | + return; |
| 138 | + } |
| 139 | + |
| 140 | + // Parse PubSub message |
| 141 | + let parts: Vec<&str> = pubsub_message.split(',').collect(); |
| 142 | + |
| 143 | + // Perform chain id check |
| 144 | + // If chain id is not set, set it |
| 145 | + let mut conn = pool.get().unwrap_or_else(|e| { |
| 146 | + error!( |
| 147 | + pubsub_message = pubsub_message, |
| 148 | + error = ?e, |
| 149 | + "[NFT Metadata Crawler] Failed to get DB connection from pool"); |
| 150 | + UNABLE_TO_GET_CONNECTION_COUNT.inc(); |
| 151 | + panic!(); |
| 152 | + }); |
| 153 | + GOT_CONNECTION_COUNT.inc(); |
| 154 | + |
| 155 | + let grpc_chain_id = parts[4].parse::<u64>().unwrap_or_else(|e| { |
| 156 | + error!( |
| 157 | + error = ?e, |
| 158 | + "[NFT Metadata Crawler] Failed to parse chain id from PubSub message" |
| 159 | + ); |
| 160 | + panic!(); |
| 161 | + }); |
| 162 | + |
| 163 | + // Panic if chain id of PubSub message does not match chain id in DB |
| 164 | + check_or_update_chain_id(&mut conn, grpc_chain_id as i64).expect("Chain id should match"); |
| 165 | + |
| 166 | + // Spawn worker |
| 167 | + let mut worker = Worker::new( |
| 168 | + parser_config.clone(), |
| 169 | + conn, |
| 170 | + gcs_client.clone(), |
| 171 | + pubsub_message.clone(), |
| 172 | + parts[0].to_string(), |
| 173 | + parts[1].to_string(), |
| 174 | + parts[2].to_string().parse().unwrap_or_else(|e|{ |
| 175 | + error!( |
| 176 | + error = ?e, |
| 177 | + "[NFT Metadata Crawler] Failed to parse last transaction version from PubSub message" |
| 178 | + ); |
| 179 | + panic!(); |
| 180 | + }), |
| 181 | + chrono::NaiveDateTime::parse_from_str(parts[3], "%Y-%m-%d %H:%M:%S %Z").unwrap_or( |
| 182 | + chrono::NaiveDateTime::parse_from_str(parts[3], "%Y-%m-%d %H:%M:%S%.f %Z").unwrap_or_else( |
| 183 | + |e| { |
| 184 | + error!( |
| 185 | + error = ?e, |
| 186 | + "[NFT Metadata Crawler] Failed to parse timestamp from PubSub message" |
| 187 | + ); |
| 188 | + panic!(); |
| 189 | + }, |
| 190 | + ), |
| 191 | + ), |
| 192 | + parts[5].parse::<bool>().unwrap_or(false), |
| 193 | + ); |
| 194 | + |
| 195 | + info!( |
| 196 | + pubsub_message = pubsub_message, |
| 197 | + "[NFT Metadata Crawler] Starting worker" |
| 198 | + ); |
| 199 | + |
| 200 | + if let Err(e) = worker.parse().await { |
| 201 | + warn!( |
| 202 | + pubsub_message = pubsub_message, |
| 203 | + error = ?e, |
| 204 | + "[NFT Metadata Crawler] Parsing failed" |
| 205 | + ); |
| 206 | + PARSER_FAIL_COUNT.inc(); |
| 207 | + } |
| 208 | + |
| 209 | + info!( |
| 210 | + pubsub_message = pubsub_message, |
| 211 | + "[NFT Metadata Crawler] Worker finished" |
| 212 | + ); |
| 213 | +} |
| 214 | + |
| 215 | +/// Handles calling parser for the root endpoint |
| 216 | +async fn handle_root( |
| 217 | + msg: Bytes, |
| 218 | + context: Arc<ServerContext>, |
| 219 | +) -> Result<impl warp::Reply, warp::Rejection> { |
| 220 | + let to_ack = context.parser_config.ack_parsed_uris.unwrap_or(false); |
| 221 | + |
| 222 | + // Use spawn_blocking to run the function on a separate thread. |
| 223 | + let _ = tokio::spawn(spawn_parser( |
| 224 | + context.parser_config.clone(), |
| 225 | + msg, |
| 226 | + context.pool.clone(), |
| 227 | + context.gcs_client.clone(), |
| 228 | + )) |
| 229 | + .await; |
| 230 | + |
| 231 | + if !to_ack { |
| 232 | + return Ok(warp::reply::with_status( |
| 233 | + warp::reply(), |
| 234 | + warp::http::StatusCode::BAD_REQUEST, |
| 235 | + )); |
| 236 | + } |
| 237 | + |
| 238 | + PUBSUB_ACK_SUCCESS_COUNT.inc(); |
| 239 | + Ok(warp::reply::with_status( |
| 240 | + warp::reply(), |
| 241 | + warp::http::StatusCode::OK, |
| 242 | + )) |
| 243 | +} |
0 commit comments