diff --git a/Cargo.lock b/Cargo.lock index f9b0e9b..3f3d33e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6820,7 +6820,7 @@ checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" [[package]] name = "tuktuk-cli" -version = "0.2.12" +version = "0.2.13" dependencies = [ "anchor-client", "anchor-lang", @@ -6838,6 +6838,7 @@ dependencies = [ "solana-program", "solana-quic-client", "solana-sdk", + "solana-transaction-status-client-types", "solana-transaction-utils", "spl-associated-token-account", "spl-token 4.0.2", diff --git a/tuktuk-cli/Cargo.toml b/tuktuk-cli/Cargo.toml index dd2370f..cebde1a 100644 --- a/tuktuk-cli/Cargo.toml +++ b/tuktuk-cli/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "tuktuk-cli" -version = "0.2.12" +version = "0.2.13" description = "A cli for tuktuk" homepage.workspace = true repository.workspace = true @@ -20,6 +20,7 @@ thiserror = "1" anchor-lang = { workspace = true } solana-quic-client = { workspace = true } solana-client = { workspace = true } +solana-transaction-status-client-types = "2.2.1" anyhow = { workspace = true } clap = { workspace = true } tokio = { workspace = true } diff --git a/tuktuk-cli/src/cmd/task.rs b/tuktuk-cli/src/cmd/task.rs index 81e1e77..dbd7ed5 100644 --- a/tuktuk-cli/src/cmd/task.rs +++ b/tuktuk-cli/src/cmd/task.rs @@ -1,4 +1,4 @@ -use std::{collections::HashSet, sync::Arc}; +use std::{collections::HashSet, sync::Arc, time::Duration}; use anyhow::anyhow; use chrono::{Local, TimeZone}; @@ -7,7 +7,10 @@ use clock::SYSVAR_CLOCK; use futures::stream::StreamExt; use itertools::Itertools; use serde::Serialize; -use solana_client::rpc_config::RpcSimulateTransactionConfig; +use solana_client::{ + rpc_client::GetConfirmedSignaturesForAddress2Config, + rpc_config::{RpcSimulateTransactionConfig, RpcTransactionConfig}, +}; use solana_sdk::{ commitment_config::CommitmentLevel, message::{v0, VersionedMessage}, @@ -15,6 +18,7 @@ use solana_sdk::{ signer::Signer, transaction::VersionedTransaction, }; +use solana_transaction_status_client_types::UiTransactionEncoding; use solana_transaction_utils::{ pack::pack_instructions_into_transactions, priority_fee::auto_compute_limit_and_price, }; @@ -101,6 +105,15 @@ pub enum Cmd { )] failed: bool, }, + Watch { + #[command(flatten)] + task_queue: TaskQueueArg, + #[arg( + long, + help = "Description prefix to watch for (can be specified multiple times)" + )] + description: Vec, + }, } async fn simulate_task(client: &CliClient, task_key: Pubkey) -> Result> { @@ -173,6 +186,108 @@ struct SimulationResult { pub compute_units: Option, } +async fn handle_task_completion(client: &CliClient, task_key: Pubkey, task_id: u16) -> Result { + println!( + "Task {} completed! Getting transaction signature...", + task_id + ); + + // Get the last 10 transaction signatures for this task + let signatures = client + .rpc_client + .get_signatures_for_address_with_config( + &task_key, + GetConfirmedSignaturesForAddress2Config { + limit: Some(10), + ..Default::default() + }, + ) + .await?; + + if signatures.is_empty() { + println!("No transaction signature found for task {}", task_id); + return Ok(()); + } + + // Limit to last 10 transactions + let recent_signatures: Vec = signatures + .iter() + .take(10) + .map(|sig_info| sig_info.signature.parse().unwrap()) + .collect(); + + // Get statuses for all signatures at once + let signature_statuses = client + .rpc_client + .get_signature_statuses_with_history(&recent_signatures) + .await?; + + // Find the first successful transaction + let mut successful_signature = None; + for (i, status_result) in signature_statuses.value.iter().enumerate() { + match status_result { + Some(status) => { + // Check if the transaction was successful (no error) + if status.err.is_none() { + successful_signature = Some(recent_signatures[i].to_string()); + break; + } + } + None => { + // Transaction not found, continue to next + continue; + } + } + } + + if let Some(signature) = successful_signature { + println!("Successful transaction signature: {}", signature); + + // Get the full transaction to extract logs + match client + .rpc_client + .get_transaction_with_config( + &signature.parse()?, + RpcTransactionConfig { + encoding: Some(UiTransactionEncoding::Json), + max_supported_transaction_version: Some(0), + ..Default::default() + }, + ) + .await + { + Ok(tx) => { + if let Some(meta) = tx.transaction.meta { + match meta.log_messages { + solana_transaction_status_client_types::option_serializer::OptionSerializer::Some(logs) => { + println!("Transaction logs:"); + for log in logs { + println!(" {}", log); + } + } + _ => { + println!("No logs found in transaction"); + } + } + } else { + println!("No transaction metadata found"); + } + } + Err(e) => { + println!("Error getting transaction details: {}", e); + } + } + } else { + println!( + "No successful transaction found for task {} (all {} recent transactions failed)", + task_id, + recent_signatures.len() + ); + } + + Ok(()) +} + impl TaskCmd { pub async fn run(&self, opts: Opts) -> Result { match &self.cmd { @@ -593,6 +708,136 @@ impl TaskCmd { println!("New task key: {new_task_key}"); } } + Cmd::Watch { + task_queue, + description, + } => { + if description.is_empty() { + return Err(anyhow!( + "At least one description must be provided for watch command" + )); + } + + let client = opts.client().await?; + let task_queue_pubkey = task_queue.get_pubkey(&client).await?.unwrap(); + let task_queue: TaskQueueV0 = client + .as_ref() + .anchor_account(&task_queue_pubkey) + .await? + .ok_or_else(|| anyhow!("Task queue account not found"))?; + + let trimmed_descriptions: Vec = description + .iter() + .map(|prefix| { + if prefix.len() > 40 { + prefix.chars().take(40).collect() + } else { + prefix.clone() + } + }) + .collect(); + + // First, get and display all existing tasks that match the description prefixes + let task_keys = tuktuk::task::keys(&task_queue_pubkey, &task_queue)?; + let existing_tasks = client + .as_ref() + .anchor_accounts::(&task_keys) + .await?; + + let mut watched_tasks = std::collections::HashMap::new(); + + // Filter and start watching existing tasks that match our prefixes + for (task_key, maybe_task) in existing_tasks { + if let Some(task) = maybe_task { + // Check if task description matches any of the prefixes + let matches = trimmed_descriptions + .iter() + .any(|prefix| task.description.starts_with(prefix)); + if matches { + println!( + "Found existing matching task: {} (ID: {}, KEY: {})", + task.description, task.id, task_key + ); + watched_tasks.insert(task_key, task.id); + } + } + } + + // Set up pubsub tracker for watching + let (pubsub_client_raw, _pubsub_handle, _shutdown_sender) = + tuktuk_sdk::pubsub_client::PubsubClient::new(client.opts.ws_url().as_str()) + .await?; + let pubsub_client = Arc::new(pubsub_client_raw); + let pubsub_tracker = Arc::new(tuktuk_sdk::watcher::PubsubTracker::new( + client.rpc_client.clone(), + pubsub_client, + Duration::from_secs(30), + solana_sdk::commitment_config::CommitmentConfig::confirmed(), + )); + + // Start watching for task updates + let (stream, _unsub) = tuktuk::task::on_new( + client.as_ref(), + &pubsub_tracker, + &task_queue_pubkey, + &task_queue, + ) + .await?; + println!( + "Watching for tasks with description prefixes: {:?}", + trimmed_descriptions + ); + println!("Press Ctrl+C to stop watching..."); + + let mut stream = Box::pin(stream); + + while let Some(update) = stream.next().await { + match update { + Ok(task_update) => { + // Check for new tasks that match any of our descriptions + for (task_key, maybe_task) in task_update.tasks { + if let Some(task) = maybe_task { + // Check if task description matches any of the prefixes + let matches = trimmed_descriptions + .iter() + .any(|prefix| task.description.starts_with(prefix)); + if matches { + println!( + "Found matching task: {} (ID: {}, KEY: {})", + task.description, task.id, task_key + ); + watched_tasks.insert(task_key, task.id); + } + } else { + // Task was removed (completed) + if let Some(task_id) = watched_tasks.remove(&task_key) { + if let Err(e) = + handle_task_completion(&client, task_key, task_id).await + { + eprintln!("Error handling task completion: {}", e); + } + } + } + } + + // Check for removed tasks + for removed_task_key in task_update.removed { + if let Some(task_id) = watched_tasks.remove(&removed_task_key) { + if let Err(e) = + handle_task_completion(&client, removed_task_key, task_id) + .await + { + eprintln!("Error handling task completion: {}", e); + } + } + } + } + Err(e) => { + eprintln!("Error receiving task update: {}", e); + } + } + } + } } Ok(()) }