Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
250 changes: 249 additions & 1 deletion ethexe/processor/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::*;
use anyhow::{Result, anyhow};
use ethexe_common::{
DEFAULT_BLOCK_GAS_LIMIT, OUTGOING_MESSAGES_SOFT_LIMIT, PROGRAM_MODIFICATIONS_SOFT_LIMIT,
ScheduledTask, SimpleBlockData,
PrivateKey, ScheduledTask, SignedMessage, SimpleBlockData,
db::*,
events::{
BlockRequestEvent, MirrorRequestEvent, RouterRequestEvent,
Expand Down Expand Up @@ -1502,6 +1502,254 @@ async fn call_wake_with_delay_is_unsupported() {
assert_eq!(reply_code, ReplyCode::Success(SuccessReplyReason::Auto));
}

/// Tests that process_programs phases execute in strict order:
/// 1. Injected transactions + events (populate queues)
/// 2. Scheduled tasks (e.g., WakeMessage moves dispatches back to queues)
/// 3. Queue processing (injected queue first, then canonical queue)
///
/// Uses `process_programs` to exercise the full pipeline.
#[tokio::test(flavor = "multi_thread")]
async fn injected_and_events_then_tasks_then_queues() {
init_logger();

// Handle function:
// - First call: sets flag and waits for 1 block
// - Subsequent calls: replies with "DONE"
let wat = r#"
(module
(import "env" "memory" (memory 1))
(import "env" "gr_reply" (func $reply (param i32 i32 i32 i32)))
(import "env" "gr_wait_for" (func $wait_for (param i32)))
(export "handle" (func $handle))
(func $handle
(if
(i32.eqz (i32.load (i32.const 0x200)))
(then
(i32.store (i32.const 0x200) (i32.const 1))
(call $wait_for (i32.const 1))
)
(else
(call $reply (i32.const 0) (i32.const 4) (i32.const 0x400) (i32.const 0x600))
)
)
)
(data (i32.const 0) "DONE")
)
"#;

let (_, code) = wat_to_wasm(wat);
let (mut processor, chain, [code_id]) = setup_test_env_and_load_codes([code.as_slice()]);

let task_user = ActorId::from(10);
let injected_user_pk = PrivateKey::random();
let canonical_user = ActorId::from(30);
let actor_id = ActorId::from(0x10000);

// === Block 1: Create program, init, and send a message that will wait ===
let block1 = chain.blocks[1].to_simple();

let create_and_init_events = vec![
BlockRequestEvent::Router(RouterRequestEvent::ProgramCreated(ProgramCreatedEvent {
actor_id,
code_id,
})),
BlockRequestEvent::Mirror {
actor_id,
event: MirrorRequestEvent::ExecutableBalanceTopUpRequested(
ExecutableBalanceTopUpRequestedEvent {
value: 500_000_000_000,
},
),
},
BlockRequestEvent::Mirror {
actor_id,
event: MirrorRequestEvent::MessageQueueingRequested(MessageQueueingRequestedEvent {
id: MessageId::from(1),
source: task_user,
payload: vec![],
value: 0,
call_reply: false,
}),
},
];

let executable = ExecutableData {
block: block1,
events: create_and_init_events,
gas_allowance: Some(DEFAULT_BLOCK_GAS_LIMIT),
..Default::default()
};
let FinalizedBlockTransitions {
states,
schedule,
program_creations,
..
} = processor.process_programs(executable, None).await.unwrap();
program_creations
.into_iter()
.for_each(|(pid, cid)| processor.db.set_program_code_id(pid, cid));

// === Block 2: Send handle message that triggers wait_for(1) ===
let block2 = chain.blocks[2].to_simple();

let wait_message_event = vec![BlockRequestEvent::Mirror {
actor_id,
event: MirrorRequestEvent::MessageQueueingRequested(MessageQueueingRequestedEvent {
id: MessageId::from(2),
source: task_user,
payload: vec![],
value: 0,
call_reply: false,
}),
}];

let executable = ExecutableData {
block: block2,
program_states: states,
schedule,
events: wait_message_event,
gas_allowance: Some(DEFAULT_BLOCK_GAS_LIMIT),
..Default::default()
};
let FinalizedBlockTransitions {
states, schedule, ..
} = processor.process_programs(executable, None).await.unwrap();

// Verify WakeMessage is scheduled for block3
let wake_block_height = block2.header.height + 1;
assert!(
schedule.contains_key(&wake_block_height),
"WakeMessage must be scheduled for block {wake_block_height}"
);

// === Block 3: All three phases via process_programs ===
// - Phase 1 (injected + events): injected tx + canonical message populate queues
// - Phase 2 (tasks): WakeMessage moves waiting dispatch back to canonical queue
// - Phase 3 (queues): injected queue first, then canonical queue
let block3 = chain.blocks[3].to_simple();
assert_eq!(block3.header.height, wake_block_height);

let injected_tx = InjectedTransaction {
destination: actor_id,
payload: vec![].try_into().unwrap(),
value: 0,
reference_block: H256::random(),
salt: H256::random().0.to_vec().try_into().unwrap(),
};
let signed_injected = SignedMessage::create(injected_user_pk, injected_tx).unwrap();
let verified_injected = signed_injected.into_verified();
let injected_user: ActorId = verified_injected.address().into();

let canonical_event = vec![BlockRequestEvent::Mirror {
actor_id,
event: MirrorRequestEvent::MessageQueueingRequested(MessageQueueingRequestedEvent {
id: MessageId::from(3),
source: canonical_user,
payload: vec![],
value: 0,
call_reply: false,
}),
}];

let (promise_out_tx, mut promise_receiver) = mpsc::unbounded_channel();

let executable = ExecutableData {
block: block3,
program_states: states,
schedule,
injected_transactions: vec![verified_injected],
events: canonical_event,
gas_allowance: Some(DEFAULT_BLOCK_GAS_LIMIT),
};
let FinalizedBlockTransitions { transitions, .. } = processor
.process_programs(executable, Some(promise_out_tx))
.await
.unwrap();

// === Verify three-phase ordering via process_programs output ===
//
// The single `process_programs` call runs three phases internally:
// Phase 1: handle_injected_and_events — pushes injected tx to injected queue,
// pushes canonical event message to canonical queue
// Phase 2: process_tasks — WakeMessage moves waiting dispatch to canonical queue
// Phase 3: process_queues — executes injected queue first, then canonical queue
//
// We prove ordering by observing the output:
//
// ASSERT: Injected+events ran BEFORE queues.
// If queues ran first, no messages would be in queues → 0 replies.
// We get 3 replies ⇒ events populated queues before they were processed.
//
// ASSERT: Tasks ran BEFORE queues.
// If queues ran before tasks, the WakeMessage wouldn't have moved the waiting dispatch
// back into the canonical queue → only 2 replies (injected + canonical event).
// We get 3 replies ⇒ tasks ran before queue processing.
//
// ASSERT: Injected+events ran BEFORE tasks (canonical queue is FIFO).
// The event message (phase 1) is queued to canonical queue before the woken dispatch
// (phase 2). So in output, event reply appears before task-woken reply.
// If tasks ran before events, the woken dispatch would be first in canonical queue,
// and we'd see task_user reply before canonical_user reply.
//
// ASSERT: Injected queue processed BEFORE canonical queue.
// Injected reply is first in output, before any canonical replies.

// Injected message must produce a promise (proves it was executed)
let promise = promise_receiver
.recv()
.await
.expect("promise must be sent for injected transaction");
assert_eq!(promise.reply.payload, b"DONE");
assert_eq!(
promise.reply.code,
ReplyCode::Success(SuccessReplyReason::Manual)
);

// Collect all outgoing messages from the single process_programs call
let to_users: Vec<_> = transitions
.iter()
.flat_map(|t| t.messages.iter().map(|m| (&t.actor_id, m)))
.collect();

// --- ASSERT: All three sources produced replies ---
// This proves both injected+events AND tasks ran BEFORE queue processing.
// If queues ran before injected+events: 0 replies (nothing in queues).
// If queues ran before tasks: 2 replies (no woken dispatch in canonical queue).
assert_eq!(
to_users.len(),
3,
"All 3 sources (injected, canonical event, task-woken) must produce replies. \
Missing replies means queue processing ran before injected+events or tasks."
);

// --- ASSERT: Injected queue processed BEFORE canonical queue ---
// The injected reply must come first because process_queues runs
// injected queues before canonical queues.
assert_eq!(
to_users[0].1.destination, injected_user,
"Injected reply must come first: injected queue is processed before canonical queue"
);
assert_eq!(to_users[0].1.payload, b"DONE");

// --- ASSERT: Events (phase 1) ran BEFORE tasks (phase 2) ---
// Canonical queue is FIFO. The event message was queued in phase 1 and the woken
// dispatch was queued in phase 2. So the event reply MUST appear before the task reply.
// If tasks ran first, task_user reply would appear at position 1, not position 2.
assert_eq!(
to_users[1].1.destination, canonical_user,
"Canonical event reply must come before task-woken reply: \
events (phase 1) must populate canonical queue before tasks (phase 2) add woken dispatches"
);
assert_eq!(to_users[1].1.payload, b"DONE");

assert_eq!(
to_users[2].1.destination, task_user,
"Task-woken reply must come last in canonical queue: \
tasks (phase 2) enqueue woken dispatch after events (phase 1) already queued their messages"
);
assert_eq!(to_users[2].1.payload, b"DONE");
}

#[tokio::test(flavor = "multi_thread")]
async fn call_wait_up_to_with_huge_duration() {
init_logger();
Expand Down
Loading