From 03c90941bd5d3e838358fe1f5fd875193dc4356c Mon Sep 17 00:00:00 2001 From: g4titanx Date: Sun, 30 Mar 2025 00:31:21 +0100 Subject: [PATCH 1/2] fix cron jobs --- core/application/src/state/executor.rs | 73 ++++++++++++++++--- .../src/state/executor/epoch_change.rs | 15 +++- core/e2e/tests/watcher.rs | 24 ++---- core/types/src/response.rs | 1 + core/types/src/state.rs | 9 ++- core/types/src/transaction.rs | 11 ++- 6 files changed, 94 insertions(+), 39 deletions(-) diff --git a/core/application/src/state/executor.rs b/core/application/src/state/executor.rs index 930fce092..2c70744f3 100644 --- a/core/application/src/state/executor.rs +++ b/core/application/src/state/executor.rs @@ -29,6 +29,8 @@ use lightning_interfaces::types::{ ExecutionData, ExecutionError, Job, + JobInfo, + JobInput, JobStatus, Metadata, MintInfo, @@ -321,7 +323,13 @@ impl StateExecutor { self.update_content_registry(txn.payload.sender, updates) }, UpdateMethod::IncrementNonce {} => TransactionResponse::Success(ExecutionData::None), - UpdateMethod::AddJobs { jobs } => self.add_jobs(jobs), + UpdateMethod::AddJobs { jobs } => { + let sender = match self.only_account_owner(txn.payload.sender) { + Ok(account) => account, + Err(e) => return e, + }; + self.add_jobs(sender, jobs) + }, UpdateMethod::RemoveJobs { jobs } => self.remove_jobs(jobs), UpdateMethod::JobUpdates { updates } => self.update_jobs(updates), }; @@ -1281,18 +1289,43 @@ impl StateExecutor { TransactionResponse::Success(ExecutionData::None) } - fn add_jobs(&self, jobs: Vec) -> TransactionResponse { - let mut jobs = jobs - .into_iter() - .map(|job| (job.hash, job)) - .collect::>(); - let job_hashes = jobs.keys().copied().collect(); - let assigned_jobs = self.assign_jobs(job_hashes); + fn add_jobs(&self, sender: EthAddress, jobs: Vec) -> TransactionResponse { + let mut total_amount = HpUfixed::<18>::zero(); + for job_input in &jobs { + total_amount += job_input.info.amount.clone(); + } - // Record the assignee in the job entry. + let mut account = self.account_info.get(&sender).unwrap_or_default(); + if account.flk_balance < total_amount { + return TransactionResponse::Revert(ExecutionError::InsufficientBalance); + } + account.flk_balance -= total_amount; + self.account_info.set(sender, account); + + let mut job_entries = Vec::new(); + for job_input in jobs { + let hash = self.compute_job_hash(&sender, &job_input.info); + if self.jobs.get(&hash).is_some() { + return TransactionResponse::Revert(ExecutionError::JobAlreadyExists); + } + if !self.is_valid_service_id(job_input.info.service) { + return TransactionResponse::Revert(ExecutionError::InvalidServiceId); + } + let job = Job { + hash, + owner: sender, + info: job_input.info, + status: None, + assignee: None, + }; + job_entries.push((hash, job)); + } + // Rest of the function (assignment and storage) remains unchanged... + let job_hashes = job_entries.iter().map(|(hash, _)| *hash).collect(); + let assigned_jobs = self.assign_jobs(job_hashes); for (index, node_jobs) in assigned_jobs.iter() { for job_hash in node_jobs { - if let Some(job) = jobs.get_mut(job_hash) { + if let Some((_, job)) = job_entries.iter_mut().find(|(h, _)| h == job_hash) { job.assignee = Some(*index); } } @@ -1312,8 +1345,8 @@ impl StateExecutor { } // Save jobs. - for (job_hash, job) in jobs { - self.jobs.set(job_hash, job); + for (hash, job) in job_entries { + self.jobs.set(hash, job); } TransactionResponse::Success(ExecutionData::None) @@ -2040,4 +2073,20 @@ impl StateExecutor { assigned_jobs } + + /// Generates a unique hash based on sender and job details + fn compute_job_hash(&self, owner: &EthAddress, info: &JobInfo) -> [u8; 32] { + let mut hasher = Sha3_256::new(); + hasher.update(owner.0); + hasher.update(&info.frequency.to_le_bytes()); + hasher.update(&info.amount.get_value().to_le_bytes::<32>()); + hasher.update(&info.service.to_le_bytes()); + hasher.update(&info.arguments); + hasher.finalize().into() + } + + /// Validates service IDs early + fn is_valid_service_id(&self, service_id: ServiceId) -> bool { + self.services.get(&service_id).is_some() + } } diff --git a/core/application/src/state/executor/epoch_change.rs b/core/application/src/state/executor/epoch_change.rs index 63d4813f8..312838286 100644 --- a/core/application/src/state/executor/epoch_change.rs +++ b/core/application/src/state/executor/epoch_change.rs @@ -13,6 +13,8 @@ use lightning_interfaces::types::{ Epoch, ExecutionData, ExecutionError, + Job, + JobInput, Metadata, NodeIndex, NodeInfo, @@ -1172,7 +1174,7 @@ impl StateExecutor { fn reassign_jobs(&self) { // Get all current jobs. - let jobs = self + let jobs: Vec = self .jobs .as_map() .values() @@ -1184,11 +1186,16 @@ impl StateExecutor { }) .collect(); - // Clear the tables. + // Clear existing job assignments self.jobs.clear(); self.assigned_jobs.clear(); - // Add these jobs as new jobs. - self.add_jobs(jobs); + // TODO(samuel): Should reassignments during epoch change preserve the + // original job owners, or use a system address? + // Reassign each job with its original owner + for job in jobs { + let job_input = vec![JobInput { info: job.info }]; + self.add_jobs(job.owner, job_input); + } } } diff --git a/core/e2e/tests/watcher.rs b/core/e2e/tests/watcher.rs index fde669b3a..c336762ef 100644 --- a/core/e2e/tests/watcher.rs +++ b/core/e2e/tests/watcher.rs @@ -5,6 +5,7 @@ use axum::{Extension, Router}; use fleek_crypto::NodePublicKey; use fn_sdk::header::{read_header, TransportDetail}; use futures::SinkExt; +use hp_fixed::unsigned::HpUfixed; use lightning_application::app::Application; use lightning_archive::archive::Archive; use lightning_blockstore::blockstore::Blockstore; @@ -15,7 +16,7 @@ use lightning_committee_beacon::CommitteeBeaconComponent; use lightning_e2e::swarm::Swarm; use lightning_forwarder::Forwarder; use lightning_interfaces::fdi::BuildGraph; -use lightning_interfaces::types::{Job, JobInfo, UpdateMethod}; +use lightning_interfaces::types::{JobInfo, JobInput, UpdateMethod}; use lightning_interfaces::{ fdi, partial_node_components, @@ -192,38 +193,29 @@ async fn test_watcher() { let node = pubkeys[2]; // Given: Some jobs. - let job1 = Job { - hash: [0; 32], + let job1 = JobInput { info: JobInfo { frequency: 1, - amount: 0, + amount: HpUfixed::<18>::from(0_u32), service: 0, arguments: vec![0; 4].into_boxed_slice(), }, - status: None, - assignee: None, }; - let job2 = Job { - hash: [1; 32], + let job2 = JobInput { info: JobInfo { frequency: 1, - amount: 0, + amount: HpUfixed::<18>::from(0_u32), service: 0, arguments: vec![1; 4].into_boxed_slice(), }, - status: None, - assignee: None, }; - let job3 = Job { - hash: [2; 32], + let job3 = JobInput { info: JobInfo { frequency: 1, - amount: 0, + amount: HpUfixed::<18>::from(0_u32), service: 0, arguments: vec![2; 4].into_boxed_slice(), }, - status: None, - assignee: None, }; // When: we submit these jobs for execution. diff --git a/core/types/src/response.rs b/core/types/src/response.rs index 6e046cb68..125f7fae8 100644 --- a/core/types/src/response.rs +++ b/core/types/src/response.rs @@ -173,4 +173,5 @@ pub enum ExecutionError { InvalidClientKeyLength, DuplicateClientKey, MissingClientKey, + JobAlreadyExists, } diff --git a/core/types/src/state.rs b/core/types/src/state.rs index 25fc3dc29..6538504da 100644 --- a/core/types/src/state.rs +++ b/core/types/src/state.rs @@ -616,6 +616,8 @@ impl TryFrom for Tokens { pub struct Job { /// The hash of the job. pub hash: [u8; 32], + /// Track the submitter + pub owner: EthAddress, /// Information about the job for execution purposes. pub info: JobInfo, /// The status of the most recent execution of a job. @@ -624,12 +626,17 @@ pub struct Job { pub assignee: Option, } +#[derive(Clone, Debug, Eq, PartialEq, Hash, Deserialize, Serialize, JsonSchema)] +pub struct JobInput { + pub info: JobInfo, +} + #[derive(Clone, Debug, Eq, PartialEq, Hash, Deserialize, Serialize, JsonSchema)] pub struct JobInfo { /// The frequency in which this job should be performed. pub frequency: u32, /// Amount prepaid. - pub amount: u32, + pub amount: HpUfixed<18>, /// The service that will execute the function. pub service: ServiceId, /// The arguments for the job. diff --git a/core/types/src/transaction.rs b/core/types/src/transaction.rs index 66f649c9d..c06f1e649 100644 --- a/core/types/src/transaction.rs +++ b/core/types/src/transaction.rs @@ -19,7 +19,7 @@ use serde::{Deserialize, Serialize}; use super::{ Epoch, Event, - Job, + JobInput, JobStatus, ProofOfConsensus, ProofOfMisbehavior, @@ -505,7 +505,7 @@ pub enum UpdateMethod { // instead of `Job` that doesn't expose fields // used for internal purposes, such as `assignee`. /// Add new jobs to the jobs table and assign them to nodes. - AddJobs { jobs: Vec }, + AddJobs { jobs: Vec }, /// Remove these jobs from the jobs table and unassigned them. RemoveJobs { jobs: Vec<[u8; 32]> }, /// Updates about the jobs' most recent executions. @@ -794,14 +794,13 @@ impl ToDigest for UpdatePayload { }, UpdateMethod::AddJobs { jobs } => { transcript_builder = transcript_builder.with("transaction_name", &"add_jobs"); - for job in jobs.iter() { + for (idx, job) in jobs.iter().enumerate() { transcript_builder = transcript_builder - .with_prefix(job.hash.encode_hex()) + .with_prefix(idx.to_string()) .with("service", &job.info.service) .with("frequency", &job.info.frequency) .with("arguments", &job.info.arguments.as_ref()) - .with("amount", &job.info.amount) - .with("assignee", &job.assignee) + .with("amount", &HpUfixedWrapper(job.info.amount.clone())); } }, UpdateMethod::RemoveJobs { jobs } => { From 43053db75e8534160bbf9bbd2fd0904c759f1ffb Mon Sep 17 00:00:00 2001 From: g4titanx Date: Sun, 30 Mar 2025 11:46:33 +0100 Subject: [PATCH 2/2] fix(cron jobs): fix more todos --- core/application/src/state/executor.rs | 27 ++++++++++++++----- .../src/state/executor/epoch_change.rs | 19 ++++++++++--- core/types/src/state.rs | 21 ++++++++++++++- 3 files changed, 57 insertions(+), 10 deletions(-) diff --git a/core/application/src/state/executor.rs b/core/application/src/state/executor.rs index 2c70744f3..a099fcb3a 100644 --- a/core/application/src/state/executor.rs +++ b/core/application/src/state/executor.rs @@ -4,7 +4,7 @@ use std::net::IpAddr; use std::ops::DerefMut; use std::time::Duration; -use ethers::abi::AbiDecode; +use ethers::abi::{AbiDecode, AbiEncode}; use ethers::core::k256::elliptic_curve::rand_core::SeedableRng; use ethers::types::{Transaction as EthersTransaction, H160}; use fleek_crypto::{ @@ -1314,13 +1314,14 @@ impl StateExecutor { let job = Job { hash, owner: sender, - info: job_input.info, + info: job_input.info.clone(), status: None, assignee: None, + prepaid_balance: job_input.info.amount.clone(), // Now valid }; job_entries.push((hash, job)); } - // Rest of the function (assignment and storage) remains unchanged... + let job_hashes = job_entries.iter().map(|(hash, _)| *hash).collect(); let assigned_jobs = self.assign_jobs(job_hashes); for (index, node_jobs) in assigned_jobs.iter() { @@ -1376,6 +1377,20 @@ impl StateExecutor { fn update_jobs(&self, updates: BTreeMap<[u8; 32], JobStatus>) -> TransactionResponse { for (job_hash, status) in updates { if let Some(mut job) = self.jobs.get(&job_hash) { + if status.success && job.prepaid_balance >= job.info.amount { + let node_index = job.assignee.unwrap(); + let node_owner = self.node_info.get(&node_index).unwrap().owner; + let mut node_account = self.account_info.get(&node_owner).unwrap_or_default(); + node_account.flk_balance += job.info.amount.clone(); + job.prepaid_balance -= job.info.amount.clone(); + self.account_info.set(node_owner, node_account); + } else if status.success { + tracing::warn!( + "Job {} succeeded but insufficient prepaid balance: {}", + job_hash.encode_hex(), + job.prepaid_balance + ); + } job.status = Some(status); self.jobs.set(job_hash, job); } @@ -2078,9 +2093,9 @@ impl StateExecutor { fn compute_job_hash(&self, owner: &EthAddress, info: &JobInfo) -> [u8; 32] { let mut hasher = Sha3_256::new(); hasher.update(owner.0); - hasher.update(&info.frequency.to_le_bytes()); - hasher.update(&info.amount.get_value().to_le_bytes::<32>()); - hasher.update(&info.service.to_le_bytes()); + hasher.update(info.frequency.to_le_bytes()); + hasher.update(info.amount.get_value().to_le_bytes::<32>()); + hasher.update(info.service.to_le_bytes()); hasher.update(&info.arguments); hasher.finalize().into() } diff --git a/core/application/src/state/executor/epoch_change.rs b/core/application/src/state/executor/epoch_change.rs index 312838286..8ce38802c 100644 --- a/core/application/src/state/executor/epoch_change.rs +++ b/core/application/src/state/executor/epoch_change.rs @@ -1,6 +1,7 @@ use std::cmp::Ordering; use std::collections::HashMap; +use ethers::abi::AbiEncode; use fleek_crypto::TransactionSender; use fxhash::FxHashMap; use hp_fixed::unsigned::HpUfixed; @@ -1190,12 +1191,24 @@ impl StateExecutor { self.jobs.clear(); self.assigned_jobs.clear(); - // TODO(samuel): Should reassignments during epoch change preserve the - // original job owners, or use a system address? // Reassign each job with its original owner for job in jobs { let job_input = vec![JobInput { info: job.info }]; - self.add_jobs(job.owner, job_input); + match self.add_jobs(job.owner, job_input) { + TransactionResponse::Success(_) => { + if let Some(mut job_entry) = self.jobs.get(&job.hash) { + job_entry.prepaid_balance = job.prepaid_balance; + self.jobs.set(job.hash, job_entry); + } + }, + TransactionResponse::Revert(err) => { + tracing::warn!( + "Failed to reassign job {}: {:?}", + job.hash.encode_hex(), + err + ); + }, + } } } } diff --git a/core/types/src/state.rs b/core/types/src/state.rs index 6538504da..78269669a 100644 --- a/core/types/src/state.rs +++ b/core/types/src/state.rs @@ -624,6 +624,8 @@ pub struct Job { pub status: Option, /// The node to which this job was assigned. pub assignee: Option, + /// Track remaining funds. + pub prepaid_balance: HpUfixed<18>, } #[derive(Clone, Debug, Eq, PartialEq, Hash, Deserialize, Serialize, JsonSchema)] @@ -639,7 +641,24 @@ pub struct JobInfo { pub amount: HpUfixed<18>, /// The service that will execute the function. pub service: ServiceId, - /// The arguments for the job. + /// Arguments for the job, encoded as a JSON string in UTF-8 bytes. + /// + /// This field contains the arguments to be passed to the JavaScript function executed by + /// the service. Clients must encode arguments as a JSON string (e.g., `{"foo": "bar", "baz": + /// 42}`) and convert it to UTF-8 bytes. Nodes will decode these bytes back to a JSON + /// string and parse it for execution. + /// + /// Example in JavaScript: + /// ```javascript + /// const args = { foo: "bar", baz: 42 }; + /// const argsBytes = Buffer.from(JSON.stringify(args), "utf8"); + /// // Use argsBytes in the AddJobs transaction + /// ``` + /// Example in Rust: + /// ```rust + /// let args = serde_json::json!({"foo": "bar", "baz": 42}); + /// let args_bytes: Box<[u8]> = serde_json::to_vec(&args).unwrap().into_boxed_slice(); + /// ``` pub arguments: Box<[u8]>, }