diff --git a/packages/common/gasoline/core/src/ctx/workflow.rs b/packages/common/gasoline/core/src/ctx/workflow.rs index 7e9d0fc34e..e35f3fc46f 100644 --- a/packages/common/gasoline/core/src/ctx/workflow.rs +++ b/packages/common/gasoline/core/src/ctx/workflow.rs @@ -25,7 +25,7 @@ use crate::{ history::{ History, cursor::{Cursor, HistoryResult}, - event::{EventId, SleepState}, + event::SleepState, location::{Coordinate, Location}, removed::Removed, }, @@ -281,7 +281,6 @@ impl WorkflowCtx { async fn run_activity( &mut self, input: &A::Input, - event_id: &EventId, location: &Location, create_ts: i64, ) -> WorkflowResult { @@ -328,7 +327,7 @@ impl WorkflowCtx { self.workflow_id, location, self.version, - event_id, + A::NAME, create_ts, &input_val, Ok(&output_val), @@ -376,7 +375,7 @@ impl WorkflowCtx { self.workflow_id, location, self.version, - event_id, + A::NAME, create_ts, &input_val, Err(&err_str), @@ -423,7 +422,7 @@ impl WorkflowCtx { self.workflow_id, location, self.version, - event_id, + A::NAME, create_ts, &input_val, Err(&err_str), @@ -576,9 +575,9 @@ impl WorkflowCtx { { self.check_stop()?; - let event_id = EventId::new(I::Activity::NAME, &input); - - let history_res = self.cursor.compare_activity(self.version, &event_id)?; + let history_res = self + .cursor + .compare_activity(self.version, I::Activity::NAME)?; let location = self.cursor.current_location_for(&history_res); // Activity was ran before @@ -605,7 +604,7 @@ impl WorkflowCtx { } match self - .run_activity::(&input, &event_id, &location, activity.create_ts) + .run_activity::(&input, &location, activity.create_ts) .await { Err(err) => { @@ -647,13 +646,8 @@ impl WorkflowCtx { } // This is a new activity else { - self.run_activity::( - &input, - &event_id, - &location, - rivet_util::timestamp::now(), - ) - .await? + self.run_activity::(&input, &location, rivet_util::timestamp::now()) + .await? }; // Move to next event diff --git a/packages/common/gasoline/core/src/db/kv/debug.rs b/packages/common/gasoline/core/src/db/kv/debug.rs index 50ffc08b0b..d68601483c 100644 --- a/packages/common/gasoline/core/src/db/kv/debug.rs +++ b/packages/common/gasoline/core/src/db/kv/debug.rs @@ -849,13 +849,6 @@ impl DatabaseDebug for DatabaseKv { .unpack::(entry.key()) { current_event.output_chunks.push(entry); - } else if let Ok(key) = self - .subspace - .unpack::(entry.key()) - { - let input_hash = key.deserialize(entry.value())?; - - current_event.input_hash = Some(input_hash); } else if let Ok(key) = self.subspace.unpack::(entry.key()) { @@ -1191,7 +1184,6 @@ struct WorkflowHistoryEventBuilder { input_chunks: Vec, output_chunks: Vec, tags: Vec<(String, String)>, - input_hash: Option>, errors: Vec, iteration: Option, deadline_ts: Option, @@ -1213,7 +1205,6 @@ impl WorkflowHistoryEventBuilder { input_chunks: Vec::new(), output_chunks: Vec::new(), tags: Vec::new(), - input_hash: None, errors: Vec::new(), iteration: None, deadline_ts: None, diff --git a/packages/common/gasoline/core/src/db/kv/keys/history.rs b/packages/common/gasoline/core/src/db/kv/keys/history.rs index e47b201c2b..9787355f96 100644 --- a/packages/common/gasoline/core/src/db/kv/keys/history.rs +++ b/packages/common/gasoline/core/src/db/kv/keys/history.rs @@ -763,68 +763,6 @@ impl<'de> TupleUnpack<'de> for OutputChunkKey { } } -#[derive(Debug)] -pub struct InputHashKey { - workflow_id: Id, - location: Location, - forgotten: bool, -} - -impl InputHashKey { - pub fn new(workflow_id: Id, location: Location) -> Self { - InputHashKey { - workflow_id, - location, - forgotten: false, - } - } -} - -impl FormalKey for InputHashKey { - type Value = Vec; - - fn deserialize(&self, raw: &[u8]) -> Result { - Ok(raw.to_vec()) - } - - fn serialize(&self, value: Self::Value) -> Result> { - Ok(value) - } -} - -impl TuplePack for InputHashKey { - fn pack( - &self, - w: &mut W, - tuple_depth: TupleDepth, - ) -> std::io::Result { - pack_history_key( - self.workflow_id, - &self.location, - w, - tuple_depth, - self.forgotten, - INPUT_HASH, - ) - } -} - -impl<'de> TupleUnpack<'de> for InputHashKey { - fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> { - let (input, (workflow_id, location, forgotten)) = - unpack_history_key(input, tuple_depth, INPUT_HASH, "INPUT_HASH")?; - - Ok(( - input, - InputHashKey { - workflow_id, - location, - forgotten, - }, - )) - } -} - #[derive(Debug)] pub struct ErrorKey { workflow_id: Id, @@ -1511,7 +1449,6 @@ pub mod insert { version: usize, create_ts: i64, activity_name: &str, - input_hash: &[u8], input: &serde_json::value::RawValue, res: std::result::Result<&serde_json::value::RawValue, &str>, ) -> Result<()> { @@ -1531,12 +1468,6 @@ pub mod insert { &activity_name_key.serialize(activity_name.to_string())?, ); - let input_hash_key = super::InputHashKey::new(workflow_id, location.clone()); - tx.set( - &subspace.pack(&input_hash_key), - &input_hash_key.serialize(input_hash.to_vec())?, - ); - let input_key = super::InputKey::new(workflow_id, location.clone()); // Write input diff --git a/packages/common/gasoline/core/src/db/kv/mod.rs b/packages/common/gasoline/core/src/db/kv/mod.rs index 9c2db70896..bb8e5f8b5f 100644 --- a/packages/common/gasoline/core/src/db/kv/mod.rs +++ b/packages/common/gasoline/core/src/db/kv/mod.rs @@ -28,8 +28,8 @@ use crate::{ error::{WorkflowError, WorkflowResult}, history::{ event::{ - ActivityEvent, Event, EventData, EventId, EventType, LoopEvent, MessageSendEvent, - RemovedEvent, SignalEvent, SignalSendEvent, SleepEvent, SleepState, SubWorkflowEvent, + ActivityEvent, Event, EventData, EventType, LoopEvent, MessageSendEvent, RemovedEvent, + SignalEvent, SignalSendEvent, SleepEvent, SleepState, SubWorkflowEvent, }, location::Location, }, @@ -1333,13 +1333,6 @@ impl Database for DatabaseKv { entry.key(), ) { current_event.output_chunks.push(entry); - } else if let Ok(key) = - self.subspace.unpack::( - entry.key(), - ) { - let input_hash = key.deserialize(entry.value())?; - - current_event.input_hash = Some(input_hash); } else if let Ok(_key) = self.subspace .unpack::(entry.key()) @@ -2336,7 +2329,7 @@ impl Database for DatabaseKv { from_workflow_id: Id, location: &Location, version: usize, - event_id: &EventId, + name: &str, create_ts: i64, input: &serde_json::value::RawValue, res: Result<&serde_json::value::RawValue, &str>, @@ -2353,8 +2346,7 @@ impl Database for DatabaseKv { location, version, create_ts, - &event_id.name, - &event_id.input_hash.to_be_bytes(), + name, input, res, )?; @@ -2718,7 +2710,6 @@ struct WorkflowHistoryEventBuilder { sub_workflow_id: Option, input_chunks: Vec, output_chunks: Vec, - input_hash: Option>, error_count: usize, iteration: Option, deadline_ts: Option, @@ -2738,7 +2729,6 @@ impl WorkflowHistoryEventBuilder { sub_workflow_id: None, input_chunks: Vec::new(), output_chunks: Vec::new(), - input_hash: None, error_count: 0, iteration: None, deadline_ts: None, @@ -2786,12 +2776,7 @@ impl TryFrom for ActivityEvent { fn try_from(value: WorkflowHistoryEventBuilder) -> WorkflowResult { Ok(ActivityEvent { - event_id: EventId::from_be_bytes( - value.name.ok_or(WorkflowError::MissingEventData("name"))?, - value - .input_hash - .ok_or(WorkflowError::MissingEventData("hash"))?, - )?, + name: value.name.ok_or(WorkflowError::MissingEventData("name"))?, create_ts: value .create_ts .ok_or(WorkflowError::MissingEventData("create_ts"))?, diff --git a/packages/common/gasoline/core/src/db/mod.rs b/packages/common/gasoline/core/src/db/mod.rs index 444600998e..81af4f6b24 100644 --- a/packages/common/gasoline/core/src/db/mod.rs +++ b/packages/common/gasoline/core/src/db/mod.rs @@ -8,7 +8,7 @@ use serde::de::DeserializeOwned; use crate::{ error::{WorkflowError, WorkflowResult}, history::{ - event::{Event, EventId, EventType, SleepState}, + event::{Event, EventType, SleepState}, location::Location, }, workflow::Workflow, @@ -205,7 +205,7 @@ pub trait Database: Send { workflow_id: Id, location: &Location, version: usize, - event_id: &EventId, + name: &str, create_ts: i64, input: &serde_json::value::RawValue, output: Result<&serde_json::value::RawValue, &str>, diff --git a/packages/common/gasoline/core/src/history/cursor.rs b/packages/common/gasoline/core/src/history/cursor.rs index 629f1305c3..825d995571 100644 --- a/packages/common/gasoline/core/src/history/cursor.rs +++ b/packages/common/gasoline/core/src/history/cursor.rs @@ -3,8 +3,8 @@ use std::cmp::Ordering; use super::{ History, event::{ - ActivityEvent, Event, EventData, EventId, EventType, LoopEvent, MessageSendEvent, - SignalEvent, SignalSendEvent, SleepEvent, SubWorkflowEvent, + ActivityEvent, Event, EventData, EventType, LoopEvent, MessageSendEvent, SignalEvent, + SignalSendEvent, SleepEvent, SubWorkflowEvent, }, location::{Coordinate, Location}, removed::Removed, @@ -187,7 +187,7 @@ impl Cursor { pub fn compare_activity( &self, version: usize, - event_id: &EventId, + name: &str, ) -> WorkflowResult> { if let Some(event) = self.current_event() { if version > event.version { @@ -200,7 +200,7 @@ impl Cursor { event.data, event.version, self.current_location(), - event_id.name, + name, version, ))); } @@ -211,18 +211,16 @@ impl Cursor { "expected {} at {}, found activity {:?}", event.data, self.current_location(), - event_id.name + name, ))); }; - if &activity.event_id != event_id { + if &activity.name != name { return Err(WorkflowError::HistoryDiverged(format!( - "expected activity {:?}#{:x} at {}, found activity {:?}#{:x}", - activity.event_id.name, - activity.event_id.input_hash, + "expected activity {:?} at {}, found activity {:?}", + activity.name, self.current_location(), - event_id.name, - event_id.input_hash, + name, ))); } @@ -536,7 +534,7 @@ impl Cursor { match T::event_type() { EventType::Activity => { if let EventData::Activity(activity) = &event.data { - T::name().expect("bad impl") == activity.event_id.name + T::name().expect("bad impl") == activity.name } else { false } diff --git a/packages/common/gasoline/core/src/history/event.rs b/packages/common/gasoline/core/src/history/event.rs index 0983156c84..7d3f7300ef 100644 --- a/packages/common/gasoline/core/src/history/event.rs +++ b/packages/common/gasoline/core/src/history/event.rs @@ -1,10 +1,7 @@ -use std::{ - hash::{DefaultHasher, Hash, Hasher}, - ops::Deref, -}; +use std::ops::Deref; use rivet_util::Id; -use serde::{Deserialize, Serialize, de::DeserializeOwned}; +use serde::de::DeserializeOwned; use strum::FromRepr; use super::location::Coordinate; @@ -62,7 +59,7 @@ pub enum EventData { impl std::fmt::Display for EventData { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match &self { - EventData::Activity(activity) => write!(f, "activity {:?}", activity.event_id.name), + EventData::Activity(activity) => write!(f, "activity {:?}", activity.name), EventData::Signal(signal) => write!(f, "signal {:?}", signal.name), EventData::SignalSend(signal_send) => write!(f, "signal send {:?}", signal_send.name), EventData::MessageSend(message_send) => { @@ -120,7 +117,7 @@ impl std::fmt::Display for EventType { #[derive(Debug)] pub struct ActivityEvent { - pub event_id: EventId, + pub name: String, pub create_ts: i64, /// If the activity succeeds, this will be some. @@ -211,45 +208,3 @@ pub struct RemovedEvent { pub event_type: EventType, pub name: Option, } - -/// Based on the name of the event and the hash of the input (if it has one). -#[derive(Deserialize, Serialize, Debug, Clone, Eq, PartialEq)] -pub struct EventId { - pub name: String, - pub input_hash: u64, -} - -impl EventId { - pub fn new(name: &str, input: I) -> Self { - let mut hasher = DefaultHasher::new(); - input.hash(&mut hasher); - let input_hash = hasher.finish(); - - Self { - name: name.to_string(), - input_hash, - } - } - - pub fn from_le_bytes(name: String, input_hash: Vec) -> WorkflowResult { - Ok(EventId { - name, - input_hash: u64::from_le_bytes( - input_hash - .try_into() - .map_err(|_| WorkflowError::IntegerConversion)?, - ), - }) - } - - pub fn from_be_bytes(name: String, input_hash: Vec) -> WorkflowResult { - Ok(EventId { - name, - input_hash: u64::from_be_bytes( - input_hash - .try_into() - .map_err(|_| WorkflowError::IntegerConversion)?, - ), - }) - } -} diff --git a/packages/common/gasoline/core/src/stub.rs b/packages/common/gasoline/core/src/stub.rs index 3c50183524..2f8fd0d48c 100644 --- a/packages/common/gasoline/core/src/stub.rs +++ b/packages/common/gasoline/core/src/stub.rs @@ -13,7 +13,7 @@ use crate::{ ctx::WorkflowCtx, error::WorkflowResult, executable::{AsyncResult, Executable}, - history::{event::EventId, removed::Removed}, + history::removed::Removed, }; // Must wrap all closures being used as executables in this function due to @@ -52,10 +52,9 @@ where } fn shift_cursor(&self, ctx: &mut WorkflowCtx) -> WorkflowResult<()> { - let event_id = EventId::new(I::Activity::NAME, &self.inner); let history_res = ctx .cursor() - .compare_activity(self.version.unwrap_or(ctx.version()), &event_id)?; + .compare_activity(self.version.unwrap_or(ctx.version()), &I::Activity::NAME)?; let location = ctx.cursor().current_location_for(&history_res); ctx.cursor_mut().update(&location); diff --git a/packages/common/gasoline/macros/src/lib.rs b/packages/common/gasoline/macros/src/lib.rs index 178aec078e..9bc8f3525e 100644 --- a/packages/common/gasoline/macros/src/lib.rs +++ b/packages/common/gasoline/macros/src/lib.rs @@ -56,6 +56,16 @@ pub fn workflow(attr: TokenStream, item: TokenStream) -> TokenStream { let struct_ident = Ident::new(&name, proc_macro2::Span::call_site()); let fn_name = item_fn.sig.ident.to_string(); + if !fn_name + .chars() + .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_') + { + return error( + item_fn.sig.ident.span(), + "invalid workflow name, must be [a-z0-9_]", + ); + } + let fn_body = item_fn.block; let vis = item_fn.vis; @@ -107,6 +117,16 @@ pub fn activity(attr: TokenStream, item: TokenStream) -> TokenStream { let struct_ident = Ident::new(&name, proc_macro2::Span::call_site()); let fn_name = item_fn.sig.ident.to_string(); + if !fn_name + .chars() + .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_') + { + return error( + item_fn.sig.ident.span(), + "invalid activity name, must be [a-z0-9_]", + ); + } + let fn_body = item_fn.block; let vis = item_fn.vis; @@ -146,6 +166,7 @@ pub fn operation(attr: TokenStream, item: TokenStream) -> TokenStream { .ident .map(|x| x.to_string()) .unwrap_or_else(|| "Operation".to_string()); + let item_fn = parse_macro_input!(item as ItemFn); let config = match parse_config(&item_fn.attrs) { @@ -166,6 +187,16 @@ pub fn operation(attr: TokenStream, item: TokenStream) -> TokenStream { let struct_ident = Ident::new(&name, proc_macro2::Span::call_site()); let fn_name = item_fn.sig.ident.to_string(); + if !fn_name + .chars() + .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_') + { + return error( + item_fn.sig.ident.span(), + "invalid operation name, must be [a-z0-9_]", + ); + } + let generics = &item_fn.sig.generics; let fn_body = item_fn.block; let vis = item_fn.vis; diff --git a/packages/common/universaldb/src/driver/rocksdb/database.rs b/packages/common/universaldb/src/driver/rocksdb/database.rs index 28575beefd..96768a28d2 100644 --- a/packages/common/universaldb/src/driver/rocksdb/database.rs +++ b/packages/common/universaldb/src/driver/rocksdb/database.rs @@ -35,6 +35,7 @@ impl RocksDbDatabaseDriver { opts.set_max_total_wal_size(64 * 1024 * 1024); // 64MB // Open the OptimisticTransactionDB + tracing::debug!(path=%db_path.display(), "opening rocksdb"); let db = OptimisticTransactionDB::open(&opts, db_path).context("failed to open rocksdb")?; Ok(RocksDbDatabaseDriver { diff --git a/packages/common/universaldb/src/utils/keys.rs b/packages/common/universaldb/src/utils/keys.rs index e267e80a88..4b6eb847bb 100644 --- a/packages/common/universaldb/src/utils/keys.rs +++ b/packages/common/universaldb/src/utils/keys.rs @@ -85,7 +85,7 @@ define_keys! { (57, FORGOTTEN, "forgotten"), (58, EVENT_TYPE, "event_type"), (59, VERSION, "version"), - (60, INPUT_HASH, "input_hash"), + // 60 (61, SIGNAL_ID, "signal_id"), (62, SUB_WORKFLOW_ID, "sub_workflow_id"), (63, ITERATION, "iteration"), diff --git a/packages/services/pegboard/src/workflows/actor/mod.rs b/packages/services/pegboard/src/workflows/actor/mod.rs index 117cb85b6e..043ca2e802 100644 --- a/packages/services/pegboard/src/workflows/actor/mod.rs +++ b/packages/services/pegboard/src/workflows/actor/mod.rs @@ -46,6 +46,7 @@ pub struct State { pub create_ts: i64, pub create_complete_ts: Option, + #[serde(default)] pub for_serverless: bool, pub start_ts: Option, diff --git a/packages/services/pegboard/src/workflows/runner.rs b/packages/services/pegboard/src/workflows/runner.rs index 7c2dd7ae93..98aa6102a0 100644 --- a/packages/services/pegboard/src/workflows/runner.rs +++ b/packages/services/pegboard/src/workflows/runner.rs @@ -639,7 +639,7 @@ enum RunnerState { } #[activity(ClearDb)] -async fn clear_Db(ctx: &ActivityCtx, input: &ClearDbInput) -> Result<()> { +async fn clear_db(ctx: &ActivityCtx, input: &ClearDbInput) -> Result<()> { let state = ctx.state::()?; let namespace_id = state.namespace_id; let create_ts = state.create_ts;