Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 10 additions & 16 deletions packages/common/gasoline/core/src/ctx/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
history::{
History,
cursor::{Cursor, HistoryResult},
event::{EventId, SleepState},
event::SleepState,
location::{Coordinate, Location},
removed::Removed,
},
Expand Down Expand Up @@ -281,7 +281,6 @@
async fn run_activity<A: Activity>(
&mut self,
input: &A::Input,
event_id: &EventId,
location: &Location,
create_ts: i64,
) -> WorkflowResult<A::Output> {
Expand Down Expand Up @@ -328,7 +327,7 @@
self.workflow_id,
location,
self.version,
event_id,
A::NAME,
create_ts,
&input_val,
Ok(&output_val),
Expand Down Expand Up @@ -376,7 +375,7 @@
self.workflow_id,
location,
self.version,
event_id,
A::NAME,
create_ts,
&input_val,
Err(&err_str),
Expand Down Expand Up @@ -423,7 +422,7 @@
self.workflow_id,
location,
self.version,
event_id,
A::NAME,
create_ts,
&input_val,
Err(&err_str),
Expand Down Expand Up @@ -554,7 +553,7 @@
impl WorkflowCtx {
/// Creates a sub workflow builder.
pub fn workflow<I>(
&mut self,

Check warning on line 556 in packages/common/gasoline/core/src/ctx/workflow.rs

View workflow job for this annotation

GitHub Actions / Test

hiding a lifetime that's elided elsewhere is confusing
input: impl WorkflowRepr<I>,
) -> builder::sub_workflow::SubWorkflowBuilder<impl WorkflowRepr<I>, I>
where
Expand All @@ -576,9 +575,9 @@
{
self.check_stop()?;

let event_id = EventId::new(I::Activity::NAME, &input);

let history_res = self.cursor.compare_activity(self.version, &event_id)?;
let history_res = self
.cursor
.compare_activity(self.version, I::Activity::NAME)?;
let location = self.cursor.current_location_for(&history_res);

// Activity was ran before
Expand All @@ -605,7 +604,7 @@
}

match self
.run_activity::<I::Activity>(&input, &event_id, &location, activity.create_ts)
.run_activity::<I::Activity>(&input, &location, activity.create_ts)
.await
{
Err(err) => {
Expand Down Expand Up @@ -647,13 +646,8 @@
}
// This is a new activity
else {
self.run_activity::<I::Activity>(
&input,
&event_id,
&location,
rivet_util::timestamp::now(),
)
.await?
self.run_activity::<I::Activity>(&input, &location, rivet_util::timestamp::now())
.await?
};

// Move to next event
Expand Down Expand Up @@ -702,7 +696,7 @@
// }

/// Creates a signal builder.
pub fn signal<T: Signal + Serialize>(&mut self, body: T) -> builder::signal::SignalBuilder<T> {

Check warning on line 699 in packages/common/gasoline/core/src/ctx/workflow.rs

View workflow job for this annotation

GitHub Actions / Test

hiding a lifetime that's elided elsewhere is confusing
builder::signal::SignalBuilder::new(self, self.version, body)
}

Expand Down Expand Up @@ -831,7 +825,7 @@
}

/// Creates a message builder.
pub fn msg<M: Message>(&mut self, body: M) -> builder::message::MessageBuilder<M> {

Check warning on line 828 in packages/common/gasoline/core/src/ctx/workflow.rs

View workflow job for this annotation

GitHub Actions / Test

hiding a lifetime that's elided elsewhere is confusing
builder::message::MessageBuilder::new(self, self.version, body)
}

Expand Down
9 changes: 0 additions & 9 deletions packages/common/gasoline/core/src/db/kv/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -849,13 +849,6 @@ impl DatabaseDebug for DatabaseKv {
.unpack::<keys::history::OutputChunkKey>(entry.key())
{
current_event.output_chunks.push(entry);
} else if let Ok(key) = self
.subspace
.unpack::<keys::history::InputHashKey>(entry.key())
{
let input_hash = key.deserialize(entry.value())?;

current_event.input_hash = Some(input_hash);
} else if let Ok(key) =
self.subspace.unpack::<keys::history::ErrorKey>(entry.key())
{
Expand Down Expand Up @@ -1191,7 +1184,6 @@ struct WorkflowHistoryEventBuilder {
input_chunks: Vec<Value>,
output_chunks: Vec<Value>,
tags: Vec<(String, String)>,
input_hash: Option<Vec<u8>>,
errors: Vec<ActivityError>,
iteration: Option<usize>,
deadline_ts: Option<i64>,
Expand All @@ -1213,7 +1205,6 @@ impl WorkflowHistoryEventBuilder {
input_chunks: Vec::new(),
output_chunks: Vec::new(),
tags: Vec::new(),
input_hash: None,
errors: Vec::new(),
iteration: None,
deadline_ts: None,
Expand Down
69 changes: 0 additions & 69 deletions packages/common/gasoline/core/src/db/kv/keys/history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -763,68 +763,6 @@ impl<'de> TupleUnpack<'de> for OutputChunkKey {
}
}

#[derive(Debug)]
pub struct InputHashKey {
workflow_id: Id,
location: Location,
forgotten: bool,
}

impl InputHashKey {
pub fn new(workflow_id: Id, location: Location) -> Self {
InputHashKey {
workflow_id,
location,
forgotten: false,
}
}
}

impl FormalKey for InputHashKey {
type Value = Vec<u8>;

fn deserialize(&self, raw: &[u8]) -> Result<Self::Value> {
Ok(raw.to_vec())
}

fn serialize(&self, value: Self::Value) -> Result<Vec<u8>> {
Ok(value)
}
}

impl TuplePack for InputHashKey {
fn pack<W: std::io::Write>(
&self,
w: &mut W,
tuple_depth: TupleDepth,
) -> std::io::Result<VersionstampOffset> {
pack_history_key(
self.workflow_id,
&self.location,
w,
tuple_depth,
self.forgotten,
INPUT_HASH,
)
}
}

impl<'de> TupleUnpack<'de> for InputHashKey {
fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> {
let (input, (workflow_id, location, forgotten)) =
unpack_history_key(input, tuple_depth, INPUT_HASH, "INPUT_HASH")?;

Ok((
input,
InputHashKey {
workflow_id,
location,
forgotten,
},
))
}
}

#[derive(Debug)]
pub struct ErrorKey {
workflow_id: Id,
Expand Down Expand Up @@ -1511,7 +1449,6 @@ pub mod insert {
version: usize,
create_ts: i64,
activity_name: &str,
input_hash: &[u8],
input: &serde_json::value::RawValue,
res: std::result::Result<&serde_json::value::RawValue, &str>,
) -> Result<()> {
Expand All @@ -1531,12 +1468,6 @@ pub mod insert {
&activity_name_key.serialize(activity_name.to_string())?,
);

let input_hash_key = super::InputHashKey::new(workflow_id, location.clone());
tx.set(
&subspace.pack(&input_hash_key),
&input_hash_key.serialize(input_hash.to_vec())?,
);

let input_key = super::InputKey::new(workflow_id, location.clone());

// Write input
Expand Down
25 changes: 5 additions & 20 deletions packages/common/gasoline/core/src/db/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ use crate::{
error::{WorkflowError, WorkflowResult},
history::{
event::{
ActivityEvent, Event, EventData, EventId, EventType, LoopEvent, MessageSendEvent,
RemovedEvent, SignalEvent, SignalSendEvent, SleepEvent, SleepState, SubWorkflowEvent,
ActivityEvent, Event, EventData, EventType, LoopEvent, MessageSendEvent, RemovedEvent,
SignalEvent, SignalSendEvent, SleepEvent, SleepState, SubWorkflowEvent,
},
location::Location,
},
Expand Down Expand Up @@ -1333,13 +1333,6 @@ impl Database for DatabaseKv {
entry.key(),
) {
current_event.output_chunks.push(entry);
} else if let Ok(key) =
self.subspace.unpack::<keys::history::InputHashKey>(
entry.key(),
) {
let input_hash = key.deserialize(entry.value())?;

current_event.input_hash = Some(input_hash);
} else if let Ok(_key) =
self.subspace
.unpack::<keys::history::ErrorKey>(entry.key())
Expand Down Expand Up @@ -2336,7 +2329,7 @@ impl Database for DatabaseKv {
from_workflow_id: Id,
location: &Location,
version: usize,
event_id: &EventId,
name: &str,
create_ts: i64,
input: &serde_json::value::RawValue,
res: Result<&serde_json::value::RawValue, &str>,
Expand All @@ -2353,8 +2346,7 @@ impl Database for DatabaseKv {
location,
version,
create_ts,
&event_id.name,
&event_id.input_hash.to_be_bytes(),
name,
input,
res,
)?;
Expand Down Expand Up @@ -2718,7 +2710,6 @@ struct WorkflowHistoryEventBuilder {
sub_workflow_id: Option<Id>,
input_chunks: Vec<Value>,
output_chunks: Vec<Value>,
input_hash: Option<Vec<u8>>,
error_count: usize,
iteration: Option<usize>,
deadline_ts: Option<i64>,
Expand All @@ -2738,7 +2729,6 @@ impl WorkflowHistoryEventBuilder {
sub_workflow_id: None,
input_chunks: Vec::new(),
output_chunks: Vec::new(),
input_hash: None,
error_count: 0,
iteration: None,
deadline_ts: None,
Expand Down Expand Up @@ -2786,12 +2776,7 @@ impl TryFrom<WorkflowHistoryEventBuilder> for ActivityEvent {

fn try_from(value: WorkflowHistoryEventBuilder) -> WorkflowResult<Self> {
Ok(ActivityEvent {
event_id: EventId::from_be_bytes(
value.name.ok_or(WorkflowError::MissingEventData("name"))?,
value
.input_hash
.ok_or(WorkflowError::MissingEventData("hash"))?,
)?,
name: value.name.ok_or(WorkflowError::MissingEventData("name"))?,
create_ts: value
.create_ts
.ok_or(WorkflowError::MissingEventData("create_ts"))?,
Expand Down
4 changes: 2 additions & 2 deletions packages/common/gasoline/core/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use serde::de::DeserializeOwned;
use crate::{
error::{WorkflowError, WorkflowResult},
history::{
event::{Event, EventId, EventType, SleepState},
event::{Event, EventType, SleepState},
location::Location,
},
workflow::Workflow,
Expand Down Expand Up @@ -205,7 +205,7 @@ pub trait Database: Send {
workflow_id: Id,
location: &Location,
version: usize,
event_id: &EventId,
name: &str,
create_ts: i64,
input: &serde_json::value::RawValue,
output: Result<&serde_json::value::RawValue, &str>,
Expand Down
22 changes: 10 additions & 12 deletions packages/common/gasoline/core/src/history/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use std::cmp::Ordering;
use super::{
History,
event::{
ActivityEvent, Event, EventData, EventId, EventType, LoopEvent, MessageSendEvent,
SignalEvent, SignalSendEvent, SleepEvent, SubWorkflowEvent,
ActivityEvent, Event, EventData, EventType, LoopEvent, MessageSendEvent, SignalEvent,
SignalSendEvent, SleepEvent, SubWorkflowEvent,
},
location::{Coordinate, Location},
removed::Removed,
Expand Down Expand Up @@ -187,7 +187,7 @@ impl Cursor {
pub fn compare_activity(
&self,
version: usize,
event_id: &EventId,
name: &str,
) -> WorkflowResult<HistoryResult<&ActivityEvent>> {
if let Some(event) = self.current_event() {
if version > event.version {
Expand All @@ -200,7 +200,7 @@ impl Cursor {
event.data,
event.version,
self.current_location(),
event_id.name,
name,
version,
)));
}
Expand All @@ -211,18 +211,16 @@ impl Cursor {
"expected {} at {}, found activity {:?}",
event.data,
self.current_location(),
event_id.name
name,
)));
};

if &activity.event_id != event_id {
if &activity.name != name {
return Err(WorkflowError::HistoryDiverged(format!(
"expected activity {:?}#{:x} at {}, found activity {:?}#{:x}",
activity.event_id.name,
activity.event_id.input_hash,
"expected activity {:?} at {}, found activity {:?}",
activity.name,
self.current_location(),
event_id.name,
event_id.input_hash,
name,
)));
}

Expand Down Expand Up @@ -536,7 +534,7 @@ impl Cursor {
match T::event_type() {
EventType::Activity => {
if let EventData::Activity(activity) = &event.data {
T::name().expect("bad impl") == activity.event_id.name
T::name().expect("bad impl") == activity.name
} else {
false
}
Expand Down
Loading
Loading