Skip to content

Commit b9be317

Browse files
committed
fix(gas): remove input hash check
1 parent 45a2522 commit b9be317

File tree

13 files changed

+68
-182
lines changed

13 files changed

+68
-182
lines changed

packages/common/gasoline/core/src/ctx/workflow.rs

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use crate::{
2525
history::{
2626
History,
2727
cursor::{Cursor, HistoryResult},
28-
event::{EventId, SleepState},
28+
event::SleepState,
2929
location::{Coordinate, Location},
3030
removed::Removed,
3131
},
@@ -281,7 +281,6 @@ impl WorkflowCtx {
281281
async fn run_activity<A: Activity>(
282282
&mut self,
283283
input: &A::Input,
284-
event_id: &EventId,
285284
location: &Location,
286285
create_ts: i64,
287286
) -> WorkflowResult<A::Output> {
@@ -328,7 +327,7 @@ impl WorkflowCtx {
328327
self.workflow_id,
329328
location,
330329
self.version,
331-
event_id,
330+
A::NAME,
332331
create_ts,
333332
&input_val,
334333
Ok(&output_val),
@@ -376,7 +375,7 @@ impl WorkflowCtx {
376375
self.workflow_id,
377376
location,
378377
self.version,
379-
event_id,
378+
A::NAME,
380379
create_ts,
381380
&input_val,
382381
Err(&err_str),
@@ -423,7 +422,7 @@ impl WorkflowCtx {
423422
self.workflow_id,
424423
location,
425424
self.version,
426-
event_id,
425+
A::NAME,
427426
create_ts,
428427
&input_val,
429428
Err(&err_str),
@@ -576,9 +575,9 @@ impl WorkflowCtx {
576575
{
577576
self.check_stop()?;
578577

579-
let event_id = EventId::new(I::Activity::NAME, &input);
580-
581-
let history_res = self.cursor.compare_activity(self.version, &event_id)?;
578+
let history_res = self
579+
.cursor
580+
.compare_activity(self.version, I::Activity::NAME)?;
582581
let location = self.cursor.current_location_for(&history_res);
583582

584583
// Activity was ran before
@@ -605,7 +604,7 @@ impl WorkflowCtx {
605604
}
606605

607606
match self
608-
.run_activity::<I::Activity>(&input, &event_id, &location, activity.create_ts)
607+
.run_activity::<I::Activity>(&input, &location, activity.create_ts)
609608
.await
610609
{
611610
Err(err) => {
@@ -647,13 +646,8 @@ impl WorkflowCtx {
647646
}
648647
// This is a new activity
649648
else {
650-
self.run_activity::<I::Activity>(
651-
&input,
652-
&event_id,
653-
&location,
654-
rivet_util::timestamp::now(),
655-
)
656-
.await?
649+
self.run_activity::<I::Activity>(&input, &location, rivet_util::timestamp::now())
650+
.await?
657651
};
658652

659653
// Move to next event

packages/common/gasoline/core/src/db/kv/debug.rs

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -849,13 +849,6 @@ impl DatabaseDebug for DatabaseKv {
849849
.unpack::<keys::history::OutputChunkKey>(entry.key())
850850
{
851851
current_event.output_chunks.push(entry);
852-
} else if let Ok(key) = self
853-
.subspace
854-
.unpack::<keys::history::InputHashKey>(entry.key())
855-
{
856-
let input_hash = key.deserialize(entry.value())?;
857-
858-
current_event.input_hash = Some(input_hash);
859852
} else if let Ok(key) =
860853
self.subspace.unpack::<keys::history::ErrorKey>(entry.key())
861854
{
@@ -1191,7 +1184,6 @@ struct WorkflowHistoryEventBuilder {
11911184
input_chunks: Vec<Value>,
11921185
output_chunks: Vec<Value>,
11931186
tags: Vec<(String, String)>,
1194-
input_hash: Option<Vec<u8>>,
11951187
errors: Vec<ActivityError>,
11961188
iteration: Option<usize>,
11971189
deadline_ts: Option<i64>,
@@ -1213,7 +1205,6 @@ impl WorkflowHistoryEventBuilder {
12131205
input_chunks: Vec::new(),
12141206
output_chunks: Vec::new(),
12151207
tags: Vec::new(),
1216-
input_hash: None,
12171208
errors: Vec::new(),
12181209
iteration: None,
12191210
deadline_ts: None,

packages/common/gasoline/core/src/db/kv/keys/history.rs

Lines changed: 0 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -763,68 +763,6 @@ impl<'de> TupleUnpack<'de> for OutputChunkKey {
763763
}
764764
}
765765

766-
#[derive(Debug)]
767-
pub struct InputHashKey {
768-
workflow_id: Id,
769-
location: Location,
770-
forgotten: bool,
771-
}
772-
773-
impl InputHashKey {
774-
pub fn new(workflow_id: Id, location: Location) -> Self {
775-
InputHashKey {
776-
workflow_id,
777-
location,
778-
forgotten: false,
779-
}
780-
}
781-
}
782-
783-
impl FormalKey for InputHashKey {
784-
type Value = Vec<u8>;
785-
786-
fn deserialize(&self, raw: &[u8]) -> Result<Self::Value> {
787-
Ok(raw.to_vec())
788-
}
789-
790-
fn serialize(&self, value: Self::Value) -> Result<Vec<u8>> {
791-
Ok(value)
792-
}
793-
}
794-
795-
impl TuplePack for InputHashKey {
796-
fn pack<W: std::io::Write>(
797-
&self,
798-
w: &mut W,
799-
tuple_depth: TupleDepth,
800-
) -> std::io::Result<VersionstampOffset> {
801-
pack_history_key(
802-
self.workflow_id,
803-
&self.location,
804-
w,
805-
tuple_depth,
806-
self.forgotten,
807-
INPUT_HASH,
808-
)
809-
}
810-
}
811-
812-
impl<'de> TupleUnpack<'de> for InputHashKey {
813-
fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> {
814-
let (input, (workflow_id, location, forgotten)) =
815-
unpack_history_key(input, tuple_depth, INPUT_HASH, "INPUT_HASH")?;
816-
817-
Ok((
818-
input,
819-
InputHashKey {
820-
workflow_id,
821-
location,
822-
forgotten,
823-
},
824-
))
825-
}
826-
}
827-
828766
#[derive(Debug)]
829767
pub struct ErrorKey {
830768
workflow_id: Id,
@@ -1511,7 +1449,6 @@ pub mod insert {
15111449
version: usize,
15121450
create_ts: i64,
15131451
activity_name: &str,
1514-
input_hash: &[u8],
15151452
input: &serde_json::value::RawValue,
15161453
res: std::result::Result<&serde_json::value::RawValue, &str>,
15171454
) -> Result<()> {
@@ -1531,12 +1468,6 @@ pub mod insert {
15311468
&activity_name_key.serialize(activity_name.to_string())?,
15321469
);
15331470

1534-
let input_hash_key = super::InputHashKey::new(workflow_id, location.clone());
1535-
tx.set(
1536-
&subspace.pack(&input_hash_key),
1537-
&input_hash_key.serialize(input_hash.to_vec())?,
1538-
);
1539-
15401471
let input_key = super::InputKey::new(workflow_id, location.clone());
15411472

15421473
// Write input

packages/common/gasoline/core/src/db/kv/mod.rs

Lines changed: 5 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ use crate::{
2828
error::{WorkflowError, WorkflowResult},
2929
history::{
3030
event::{
31-
ActivityEvent, Event, EventData, EventId, EventType, LoopEvent, MessageSendEvent,
32-
RemovedEvent, SignalEvent, SignalSendEvent, SleepEvent, SleepState, SubWorkflowEvent,
31+
ActivityEvent, Event, EventData, EventType, LoopEvent, MessageSendEvent, RemovedEvent,
32+
SignalEvent, SignalSendEvent, SleepEvent, SleepState, SubWorkflowEvent,
3333
},
3434
location::Location,
3535
},
@@ -1333,13 +1333,6 @@ impl Database for DatabaseKv {
13331333
entry.key(),
13341334
) {
13351335
current_event.output_chunks.push(entry);
1336-
} else if let Ok(key) =
1337-
self.subspace.unpack::<keys::history::InputHashKey>(
1338-
entry.key(),
1339-
) {
1340-
let input_hash = key.deserialize(entry.value())?;
1341-
1342-
current_event.input_hash = Some(input_hash);
13431336
} else if let Ok(_key) =
13441337
self.subspace
13451338
.unpack::<keys::history::ErrorKey>(entry.key())
@@ -2336,7 +2329,7 @@ impl Database for DatabaseKv {
23362329
from_workflow_id: Id,
23372330
location: &Location,
23382331
version: usize,
2339-
event_id: &EventId,
2332+
name: &str,
23402333
create_ts: i64,
23412334
input: &serde_json::value::RawValue,
23422335
res: Result<&serde_json::value::RawValue, &str>,
@@ -2353,8 +2346,7 @@ impl Database for DatabaseKv {
23532346
location,
23542347
version,
23552348
create_ts,
2356-
&event_id.name,
2357-
&event_id.input_hash.to_be_bytes(),
2349+
name,
23582350
input,
23592351
res,
23602352
)?;
@@ -2718,7 +2710,6 @@ struct WorkflowHistoryEventBuilder {
27182710
sub_workflow_id: Option<Id>,
27192711
input_chunks: Vec<Value>,
27202712
output_chunks: Vec<Value>,
2721-
input_hash: Option<Vec<u8>>,
27222713
error_count: usize,
27232714
iteration: Option<usize>,
27242715
deadline_ts: Option<i64>,
@@ -2738,7 +2729,6 @@ impl WorkflowHistoryEventBuilder {
27382729
sub_workflow_id: None,
27392730
input_chunks: Vec::new(),
27402731
output_chunks: Vec::new(),
2741-
input_hash: None,
27422732
error_count: 0,
27432733
iteration: None,
27442734
deadline_ts: None,
@@ -2786,12 +2776,7 @@ impl TryFrom<WorkflowHistoryEventBuilder> for ActivityEvent {
27862776

27872777
fn try_from(value: WorkflowHistoryEventBuilder) -> WorkflowResult<Self> {
27882778
Ok(ActivityEvent {
2789-
event_id: EventId::from_be_bytes(
2790-
value.name.ok_or(WorkflowError::MissingEventData("name"))?,
2791-
value
2792-
.input_hash
2793-
.ok_or(WorkflowError::MissingEventData("hash"))?,
2794-
)?,
2779+
name: value.name.ok_or(WorkflowError::MissingEventData("name"))?,
27952780
create_ts: value
27962781
.create_ts
27972782
.ok_or(WorkflowError::MissingEventData("create_ts"))?,

packages/common/gasoline/core/src/db/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use serde::de::DeserializeOwned;
88
use crate::{
99
error::{WorkflowError, WorkflowResult},
1010
history::{
11-
event::{Event, EventId, EventType, SleepState},
11+
event::{Event, EventType, SleepState},
1212
location::Location,
1313
},
1414
workflow::Workflow,
@@ -205,7 +205,7 @@ pub trait Database: Send {
205205
workflow_id: Id,
206206
location: &Location,
207207
version: usize,
208-
event_id: &EventId,
208+
name: &str,
209209
create_ts: i64,
210210
input: &serde_json::value::RawValue,
211211
output: Result<&serde_json::value::RawValue, &str>,

packages/common/gasoline/core/src/history/cursor.rs

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ use std::cmp::Ordering;
33
use super::{
44
History,
55
event::{
6-
ActivityEvent, Event, EventData, EventId, EventType, LoopEvent, MessageSendEvent,
7-
SignalEvent, SignalSendEvent, SleepEvent, SubWorkflowEvent,
6+
ActivityEvent, Event, EventData, EventType, LoopEvent, MessageSendEvent, SignalEvent,
7+
SignalSendEvent, SleepEvent, SubWorkflowEvent,
88
},
99
location::{Coordinate, Location},
1010
removed::Removed,
@@ -187,7 +187,7 @@ impl Cursor {
187187
pub fn compare_activity(
188188
&self,
189189
version: usize,
190-
event_id: &EventId,
190+
name: &str,
191191
) -> WorkflowResult<HistoryResult<&ActivityEvent>> {
192192
if let Some(event) = self.current_event() {
193193
if version > event.version {
@@ -200,7 +200,7 @@ impl Cursor {
200200
event.data,
201201
event.version,
202202
self.current_location(),
203-
event_id.name,
203+
name,
204204
version,
205205
)));
206206
}
@@ -211,18 +211,16 @@ impl Cursor {
211211
"expected {} at {}, found activity {:?}",
212212
event.data,
213213
self.current_location(),
214-
event_id.name
214+
name,
215215
)));
216216
};
217217

218-
if &activity.event_id != event_id {
218+
if &activity.name != name {
219219
return Err(WorkflowError::HistoryDiverged(format!(
220-
"expected activity {:?}#{:x} at {}, found activity {:?}#{:x}",
221-
activity.event_id.name,
222-
activity.event_id.input_hash,
220+
"expected activity {:?} at {}, found activity {:?}",
221+
activity.name,
223222
self.current_location(),
224-
event_id.name,
225-
event_id.input_hash,
223+
name,
226224
)));
227225
}
228226

@@ -536,7 +534,7 @@ impl Cursor {
536534
match T::event_type() {
537535
EventType::Activity => {
538536
if let EventData::Activity(activity) = &event.data {
539-
T::name().expect("bad impl") == activity.event_id.name
537+
T::name().expect("bad impl") == activity.name
540538
} else {
541539
false
542540
}

0 commit comments

Comments
 (0)