Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion tuktuk-cli/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "tuktuk-cli"
version = "0.2.12"
version = "0.2.13"
description = "A cli for tuktuk"
homepage.workspace = true
repository.workspace = true
Expand All @@ -20,6 +20,7 @@ thiserror = "1"
anchor-lang = { workspace = true }
solana-quic-client = { workspace = true }
solana-client = { workspace = true }
solana-transaction-status-client-types = "2.2.1"
anyhow = { workspace = true }
clap = { workspace = true }
tokio = { workspace = true }
Expand Down
249 changes: 247 additions & 2 deletions tuktuk-cli/src/cmd/task.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::HashSet, sync::Arc};
use std::{collections::HashSet, sync::Arc, time::Duration};

use anyhow::anyhow;
use chrono::{Local, TimeZone};
Expand All @@ -7,14 +7,18 @@ use clock::SYSVAR_CLOCK;
use futures::stream::StreamExt;
use itertools::Itertools;
use serde::Serialize;
use solana_client::rpc_config::RpcSimulateTransactionConfig;
use solana_client::{
rpc_client::GetConfirmedSignaturesForAddress2Config,
rpc_config::{RpcSimulateTransactionConfig, RpcTransactionConfig},
};
use solana_sdk::{
commitment_config::CommitmentLevel,
message::{v0, VersionedMessage},
pubkey::Pubkey,
signer::Signer,
transaction::VersionedTransaction,
};
use solana_transaction_status_client_types::UiTransactionEncoding;
use solana_transaction_utils::{
pack::pack_instructions_into_transactions, priority_fee::auto_compute_limit_and_price,
};
Expand Down Expand Up @@ -101,6 +105,15 @@ pub enum Cmd {
)]
failed: bool,
},
Watch {
#[command(flatten)]
task_queue: TaskQueueArg,
#[arg(
long,
help = "Description prefix to watch for (can be specified multiple times)"
)]
description: Vec<String>,
},
}

async fn simulate_task(client: &CliClient, task_key: Pubkey) -> Result<Option<SimulationResult>> {
Expand Down Expand Up @@ -173,6 +186,108 @@ struct SimulationResult {
pub compute_units: Option<u64>,
}

async fn handle_task_completion(client: &CliClient, task_key: Pubkey, task_id: u16) -> Result {
println!(
"Task {} completed! Getting transaction signature...",
task_id
);

// Get the last 10 transaction signatures for this task
let signatures = client
.rpc_client
.get_signatures_for_address_with_config(
&task_key,
GetConfirmedSignaturesForAddress2Config {
limit: Some(10),
..Default::default()
},
)
.await?;

if signatures.is_empty() {
println!("No transaction signature found for task {}", task_id);
return Ok(());
}

// Limit to last 10 transactions
let recent_signatures: Vec<solana_sdk::signature::Signature> = signatures
.iter()
.take(10)
.map(|sig_info| sig_info.signature.parse().unwrap())
.collect();

// Get statuses for all signatures at once
let signature_statuses = client
.rpc_client
.get_signature_statuses_with_history(&recent_signatures)
.await?;

// Find the first successful transaction
let mut successful_signature = None;
for (i, status_result) in signature_statuses.value.iter().enumerate() {
match status_result {
Some(status) => {
// Check if the transaction was successful (no error)
if status.err.is_none() {
successful_signature = Some(recent_signatures[i].to_string());
break;
}
}
None => {
// Transaction not found, continue to next
continue;
}
}
}

if let Some(signature) = successful_signature {
println!("Successful transaction signature: {}", signature);

// Get the full transaction to extract logs
match client
.rpc_client
.get_transaction_with_config(
&signature.parse()?,
RpcTransactionConfig {
encoding: Some(UiTransactionEncoding::Json),
max_supported_transaction_version: Some(0),
..Default::default()
},
)
.await
{
Ok(tx) => {
if let Some(meta) = tx.transaction.meta {
match meta.log_messages {
solana_transaction_status_client_types::option_serializer::OptionSerializer::Some(logs) => {
println!("Transaction logs:");
for log in logs {
println!(" {}", log);
}
}
_ => {
println!("No logs found in transaction");
}
}
} else {
println!("No transaction metadata found");
}
}
Err(e) => {
println!("Error getting transaction details: {}", e);
}
}
} else {
println!(
"No successful transaction found for task {} (all {} recent transactions failed)",
task_id,
recent_signatures.len()
);
}

Ok(())
}

impl TaskCmd {
pub async fn run(&self, opts: Opts) -> Result {
match &self.cmd {
Expand Down Expand Up @@ -593,6 +708,136 @@ impl TaskCmd {
println!("New task key: {new_task_key}");
}
}
Cmd::Watch {
task_queue,
description,
} => {
if description.is_empty() {
return Err(anyhow!(
"At least one description must be provided for watch command"
));
}

let client = opts.client().await?;
let task_queue_pubkey = task_queue.get_pubkey(&client).await?.unwrap();
let task_queue: TaskQueueV0 = client
.as_ref()
.anchor_account(&task_queue_pubkey)
.await?
.ok_or_else(|| anyhow!("Task queue account not found"))?;

let trimmed_descriptions: Vec<String> = description
.iter()
.map(|prefix| {
if prefix.len() > 40 {
prefix.chars().take(40).collect()
} else {
prefix.clone()
}
})
.collect();

// First, get and display all existing tasks that match the description prefixes
let task_keys = tuktuk::task::keys(&task_queue_pubkey, &task_queue)?;
let existing_tasks = client
.as_ref()
.anchor_accounts::<TaskV0>(&task_keys)
.await?;

let mut watched_tasks = std::collections::HashMap::new();

// Filter and start watching existing tasks that match our prefixes
for (task_key, maybe_task) in existing_tasks {
if let Some(task) = maybe_task {
// Check if task description matches any of the prefixes
let matches = trimmed_descriptions
.iter()
.any(|prefix| task.description.starts_with(prefix));
if matches {
println!(
"Found existing matching task: {} (ID: {}, KEY: {})",
task.description, task.id, task_key
);
watched_tasks.insert(task_key, task.id);
}
}
}

// Set up pubsub tracker for watching
let (pubsub_client_raw, _pubsub_handle, _shutdown_sender) =
tuktuk_sdk::pubsub_client::PubsubClient::new(client.opts.ws_url().as_str())
.await?;
let pubsub_client = Arc::new(pubsub_client_raw);
let pubsub_tracker = Arc::new(tuktuk_sdk::watcher::PubsubTracker::new(
client.rpc_client.clone(),
pubsub_client,
Duration::from_secs(30),
solana_sdk::commitment_config::CommitmentConfig::confirmed(),
));

// Start watching for task updates
let (stream, _unsub) = tuktuk::task::on_new(
client.as_ref(),
&pubsub_tracker,
&task_queue_pubkey,
&task_queue,
)
.await?;
println!(
"Watching for tasks with description prefixes: {:?}",
trimmed_descriptions
);
println!("Press Ctrl+C to stop watching...");

let mut stream = Box::pin(stream);

while let Some(update) = stream.next().await {
match update {
Ok(task_update) => {
// Check for new tasks that match any of our descriptions
for (task_key, maybe_task) in task_update.tasks {
if let Some(task) = maybe_task {
// Check if task description matches any of the prefixes
let matches = trimmed_descriptions
.iter()
.any(|prefix| task.description.starts_with(prefix));
if matches {
println!(
"Found matching task: {} (ID: {}, KEY: {})",
task.description, task.id, task_key
);
watched_tasks.insert(task_key, task.id);
}
} else {
// Task was removed (completed)
if let Some(task_id) = watched_tasks.remove(&task_key) {
if let Err(e) =
handle_task_completion(&client, task_key, task_id).await
{
eprintln!("Error handling task completion: {}", e);
}
}
}
}

// Check for removed tasks
for removed_task_key in task_update.removed {
if let Some(task_id) = watched_tasks.remove(&removed_task_key) {
if let Err(e) =
handle_task_completion(&client, removed_task_key, task_id)
.await
{
eprintln!("Error handling task completion: {}", e);
}
}
}
}
Err(e) => {
eprintln!("Error receiving task update: {}", e);
}
}
}
}
}
Ok(())
}
Expand Down