diff --git a/core/application/src/state/executor.rs b/core/application/src/state/executor.rs index 930fce092..a099fcb3a 100644 --- a/core/application/src/state/executor.rs +++ b/core/application/src/state/executor.rs @@ -4,7 +4,7 @@ use std::net::IpAddr; use std::ops::DerefMut; use std::time::Duration; -use ethers::abi::AbiDecode; +use ethers::abi::{AbiDecode, AbiEncode}; use ethers::core::k256::elliptic_curve::rand_core::SeedableRng; use ethers::types::{Transaction as EthersTransaction, H160}; use fleek_crypto::{ @@ -29,6 +29,8 @@ use lightning_interfaces::types::{ ExecutionData, ExecutionError, Job, + JobInfo, + JobInput, JobStatus, Metadata, MintInfo, @@ -321,7 +323,13 @@ impl StateExecutor { self.update_content_registry(txn.payload.sender, updates) }, UpdateMethod::IncrementNonce {} => TransactionResponse::Success(ExecutionData::None), - UpdateMethod::AddJobs { jobs } => self.add_jobs(jobs), + UpdateMethod::AddJobs { jobs } => { + let sender = match self.only_account_owner(txn.payload.sender) { + Ok(account) => account, + Err(e) => return e, + }; + self.add_jobs(sender, jobs) + }, UpdateMethod::RemoveJobs { jobs } => self.remove_jobs(jobs), UpdateMethod::JobUpdates { updates } => self.update_jobs(updates), }; @@ -1281,18 +1289,44 @@ impl StateExecutor { TransactionResponse::Success(ExecutionData::None) } - fn add_jobs(&self, jobs: Vec) -> TransactionResponse { - let mut jobs = jobs - .into_iter() - .map(|job| (job.hash, job)) - .collect::>(); - let job_hashes = jobs.keys().copied().collect(); - let assigned_jobs = self.assign_jobs(job_hashes); + fn add_jobs(&self, sender: EthAddress, jobs: Vec) -> TransactionResponse { + let mut total_amount = HpUfixed::<18>::zero(); + for job_input in &jobs { + total_amount += job_input.info.amount.clone(); + } + + let mut account = self.account_info.get(&sender).unwrap_or_default(); + if account.flk_balance < total_amount { + return TransactionResponse::Revert(ExecutionError::InsufficientBalance); + } + account.flk_balance -= total_amount; + self.account_info.set(sender, account); - // Record the assignee in the job entry. + let mut job_entries = Vec::new(); + for job_input in jobs { + let hash = self.compute_job_hash(&sender, &job_input.info); + if self.jobs.get(&hash).is_some() { + return TransactionResponse::Revert(ExecutionError::JobAlreadyExists); + } + if !self.is_valid_service_id(job_input.info.service) { + return TransactionResponse::Revert(ExecutionError::InvalidServiceId); + } + let job = Job { + hash, + owner: sender, + info: job_input.info.clone(), + status: None, + assignee: None, + prepaid_balance: job_input.info.amount.clone(), // Now valid + }; + job_entries.push((hash, job)); + } + + let job_hashes = job_entries.iter().map(|(hash, _)| *hash).collect(); + let assigned_jobs = self.assign_jobs(job_hashes); for (index, node_jobs) in assigned_jobs.iter() { for job_hash in node_jobs { - if let Some(job) = jobs.get_mut(job_hash) { + if let Some((_, job)) = job_entries.iter_mut().find(|(h, _)| h == job_hash) { job.assignee = Some(*index); } } @@ -1312,8 +1346,8 @@ impl StateExecutor { } // Save jobs. - for (job_hash, job) in jobs { - self.jobs.set(job_hash, job); + for (hash, job) in job_entries { + self.jobs.set(hash, job); } TransactionResponse::Success(ExecutionData::None) @@ -1343,6 +1377,20 @@ impl StateExecutor { fn update_jobs(&self, updates: BTreeMap<[u8; 32], JobStatus>) -> TransactionResponse { for (job_hash, status) in updates { if let Some(mut job) = self.jobs.get(&job_hash) { + if status.success && job.prepaid_balance >= job.info.amount { + let node_index = job.assignee.unwrap(); + let node_owner = self.node_info.get(&node_index).unwrap().owner; + let mut node_account = self.account_info.get(&node_owner).unwrap_or_default(); + node_account.flk_balance += job.info.amount.clone(); + job.prepaid_balance -= job.info.amount.clone(); + self.account_info.set(node_owner, node_account); + } else if status.success { + tracing::warn!( + "Job {} succeeded but insufficient prepaid balance: {}", + job_hash.encode_hex(), + job.prepaid_balance + ); + } job.status = Some(status); self.jobs.set(job_hash, job); } @@ -2040,4 +2088,20 @@ impl StateExecutor { assigned_jobs } + + /// Generates a unique hash based on sender and job details + fn compute_job_hash(&self, owner: &EthAddress, info: &JobInfo) -> [u8; 32] { + let mut hasher = Sha3_256::new(); + hasher.update(owner.0); + hasher.update(info.frequency.to_le_bytes()); + hasher.update(info.amount.get_value().to_le_bytes::<32>()); + hasher.update(info.service.to_le_bytes()); + hasher.update(&info.arguments); + hasher.finalize().into() + } + + /// Validates service IDs early + fn is_valid_service_id(&self, service_id: ServiceId) -> bool { + self.services.get(&service_id).is_some() + } } diff --git a/core/application/src/state/executor/epoch_change.rs b/core/application/src/state/executor/epoch_change.rs index 63d4813f8..8ce38802c 100644 --- a/core/application/src/state/executor/epoch_change.rs +++ b/core/application/src/state/executor/epoch_change.rs @@ -1,6 +1,7 @@ use std::cmp::Ordering; use std::collections::HashMap; +use ethers::abi::AbiEncode; use fleek_crypto::TransactionSender; use fxhash::FxHashMap; use hp_fixed::unsigned::HpUfixed; @@ -13,6 +14,8 @@ use lightning_interfaces::types::{ Epoch, ExecutionData, ExecutionError, + Job, + JobInput, Metadata, NodeIndex, NodeInfo, @@ -1172,7 +1175,7 @@ impl StateExecutor { fn reassign_jobs(&self) { // Get all current jobs. - let jobs = self + let jobs: Vec = self .jobs .as_map() .values() @@ -1184,11 +1187,28 @@ impl StateExecutor { }) .collect(); - // Clear the tables. + // Clear existing job assignments self.jobs.clear(); self.assigned_jobs.clear(); - // Add these jobs as new jobs. - self.add_jobs(jobs); + // Reassign each job with its original owner + for job in jobs { + let job_input = vec![JobInput { info: job.info }]; + match self.add_jobs(job.owner, job_input) { + TransactionResponse::Success(_) => { + if let Some(mut job_entry) = self.jobs.get(&job.hash) { + job_entry.prepaid_balance = job.prepaid_balance; + self.jobs.set(job.hash, job_entry); + } + }, + TransactionResponse::Revert(err) => { + tracing::warn!( + "Failed to reassign job {}: {:?}", + job.hash.encode_hex(), + err + ); + }, + } + } } } diff --git a/core/e2e/tests/watcher.rs b/core/e2e/tests/watcher.rs index fde669b3a..c336762ef 100644 --- a/core/e2e/tests/watcher.rs +++ b/core/e2e/tests/watcher.rs @@ -5,6 +5,7 @@ use axum::{Extension, Router}; use fleek_crypto::NodePublicKey; use fn_sdk::header::{read_header, TransportDetail}; use futures::SinkExt; +use hp_fixed::unsigned::HpUfixed; use lightning_application::app::Application; use lightning_archive::archive::Archive; use lightning_blockstore::blockstore::Blockstore; @@ -15,7 +16,7 @@ use lightning_committee_beacon::CommitteeBeaconComponent; use lightning_e2e::swarm::Swarm; use lightning_forwarder::Forwarder; use lightning_interfaces::fdi::BuildGraph; -use lightning_interfaces::types::{Job, JobInfo, UpdateMethod}; +use lightning_interfaces::types::{JobInfo, JobInput, UpdateMethod}; use lightning_interfaces::{ fdi, partial_node_components, @@ -192,38 +193,29 @@ async fn test_watcher() { let node = pubkeys[2]; // Given: Some jobs. - let job1 = Job { - hash: [0; 32], + let job1 = JobInput { info: JobInfo { frequency: 1, - amount: 0, + amount: HpUfixed::<18>::from(0_u32), service: 0, arguments: vec![0; 4].into_boxed_slice(), }, - status: None, - assignee: None, }; - let job2 = Job { - hash: [1; 32], + let job2 = JobInput { info: JobInfo { frequency: 1, - amount: 0, + amount: HpUfixed::<18>::from(0_u32), service: 0, arguments: vec![1; 4].into_boxed_slice(), }, - status: None, - assignee: None, }; - let job3 = Job { - hash: [2; 32], + let job3 = JobInput { info: JobInfo { frequency: 1, - amount: 0, + amount: HpUfixed::<18>::from(0_u32), service: 0, arguments: vec![2; 4].into_boxed_slice(), }, - status: None, - assignee: None, }; // When: we submit these jobs for execution. diff --git a/core/types/src/response.rs b/core/types/src/response.rs index 6e046cb68..125f7fae8 100644 --- a/core/types/src/response.rs +++ b/core/types/src/response.rs @@ -173,4 +173,5 @@ pub enum ExecutionError { InvalidClientKeyLength, DuplicateClientKey, MissingClientKey, + JobAlreadyExists, } diff --git a/core/types/src/state.rs b/core/types/src/state.rs index 25fc3dc29..78269669a 100644 --- a/core/types/src/state.rs +++ b/core/types/src/state.rs @@ -616,12 +616,21 @@ impl TryFrom for Tokens { pub struct Job { /// The hash of the job. pub hash: [u8; 32], + /// Track the submitter + pub owner: EthAddress, /// Information about the job for execution purposes. pub info: JobInfo, /// The status of the most recent execution of a job. pub status: Option, /// The node to which this job was assigned. pub assignee: Option, + /// Track remaining funds. + pub prepaid_balance: HpUfixed<18>, +} + +#[derive(Clone, Debug, Eq, PartialEq, Hash, Deserialize, Serialize, JsonSchema)] +pub struct JobInput { + pub info: JobInfo, } #[derive(Clone, Debug, Eq, PartialEq, Hash, Deserialize, Serialize, JsonSchema)] @@ -629,10 +638,27 @@ pub struct JobInfo { /// The frequency in which this job should be performed. pub frequency: u32, /// Amount prepaid. - pub amount: u32, + pub amount: HpUfixed<18>, /// The service that will execute the function. pub service: ServiceId, - /// The arguments for the job. + /// Arguments for the job, encoded as a JSON string in UTF-8 bytes. + /// + /// This field contains the arguments to be passed to the JavaScript function executed by + /// the service. Clients must encode arguments as a JSON string (e.g., `{"foo": "bar", "baz": + /// 42}`) and convert it to UTF-8 bytes. Nodes will decode these bytes back to a JSON + /// string and parse it for execution. + /// + /// Example in JavaScript: + /// ```javascript + /// const args = { foo: "bar", baz: 42 }; + /// const argsBytes = Buffer.from(JSON.stringify(args), "utf8"); + /// // Use argsBytes in the AddJobs transaction + /// ``` + /// Example in Rust: + /// ```rust + /// let args = serde_json::json!({"foo": "bar", "baz": 42}); + /// let args_bytes: Box<[u8]> = serde_json::to_vec(&args).unwrap().into_boxed_slice(); + /// ``` pub arguments: Box<[u8]>, } diff --git a/core/types/src/transaction.rs b/core/types/src/transaction.rs index 66f649c9d..c06f1e649 100644 --- a/core/types/src/transaction.rs +++ b/core/types/src/transaction.rs @@ -19,7 +19,7 @@ use serde::{Deserialize, Serialize}; use super::{ Epoch, Event, - Job, + JobInput, JobStatus, ProofOfConsensus, ProofOfMisbehavior, @@ -505,7 +505,7 @@ pub enum UpdateMethod { // instead of `Job` that doesn't expose fields // used for internal purposes, such as `assignee`. /// Add new jobs to the jobs table and assign them to nodes. - AddJobs { jobs: Vec }, + AddJobs { jobs: Vec }, /// Remove these jobs from the jobs table and unassigned them. RemoveJobs { jobs: Vec<[u8; 32]> }, /// Updates about the jobs' most recent executions. @@ -794,14 +794,13 @@ impl ToDigest for UpdatePayload { }, UpdateMethod::AddJobs { jobs } => { transcript_builder = transcript_builder.with("transaction_name", &"add_jobs"); - for job in jobs.iter() { + for (idx, job) in jobs.iter().enumerate() { transcript_builder = transcript_builder - .with_prefix(job.hash.encode_hex()) + .with_prefix(idx.to_string()) .with("service", &job.info.service) .with("frequency", &job.info.frequency) .with("arguments", &job.info.arguments.as_ref()) - .with("amount", &job.info.amount) - .with("assignee", &job.assignee) + .with("amount", &HpUfixedWrapper(job.info.amount.clone())); } }, UpdateMethod::RemoveJobs { jobs } => {